74
74
MAIN_PROCESS_INSTANCE_NAME = "main"
75
75
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
76
76
MAIN_PROCESS_REPLICATION_PORT = 9093
77
+ # Obviously, these would only be used with the UNIX socket option
78
+ MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
79
+ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
77
80
78
81
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
79
82
# during processing with the name of the worker.
@@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config(
407
410
)
408
411
409
412
# Map of stream writer instance names to host/ports combos
410
- instance_map [worker_name ] = {
411
- "host" : "localhost" ,
412
- "port" : worker_port ,
413
- }
414
-
413
+ if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
414
+ instance_map [worker_name ] = {
415
+ "path" : f"/run/worker.{ worker_port } " ,
416
+ }
417
+ else :
418
+ instance_map [worker_name ] = {
419
+ "host" : "localhost" ,
420
+ "port" : worker_port ,
421
+ }
415
422
# Update the list of stream writers. It's convenient that the name of the worker
416
423
# type is the same as the stream to write. Iterate over the whole list in case there
417
424
# is more than one.
@@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config(
423
430
424
431
# Map of stream writer instance names to host/ports combos
425
432
# For now, all stream writers need http replication ports
426
- instance_map [worker_name ] = {
427
- "host" : "localhost" ,
428
- "port" : worker_port ,
429
- }
433
+ if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
434
+ instance_map [worker_name ] = {
435
+ "path" : f"/run/worker.{ worker_port } " ,
436
+ }
437
+ else :
438
+ instance_map [worker_name ] = {
439
+ "host" : "localhost" ,
440
+ "port" : worker_port ,
441
+ }
430
442
431
443
432
444
def merge_worker_template_configs (
@@ -718,17 +730,29 @@ def generate_worker_files(
718
730
# Note that yaml cares about indentation, so care should be taken to insert lines
719
731
# into files at the correct indentation below.
720
732
733
+ # Convenience helper for if using unix sockets instead of host:port
734
+ using_unix_sockets = environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False )
721
735
# First read the original config file and extract the listeners block. Then we'll
722
736
# add another listener for replication. Later we'll write out the result to the
723
737
# shared config file.
724
- listeners = [
725
- {
726
- "port" : MAIN_PROCESS_REPLICATION_PORT ,
727
- "bind_address" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
728
- "type" : "http" ,
729
- "resources" : [{"names" : ["replication" ]}],
730
- }
731
- ]
738
+ listeners : List [Any ]
739
+ if using_unix_sockets :
740
+ listeners = [
741
+ {
742
+ "path" : MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH ,
743
+ "type" : "http" ,
744
+ "resources" : [{"names" : ["replication" ]}],
745
+ }
746
+ ]
747
+ else :
748
+ listeners = [
749
+ {
750
+ "port" : MAIN_PROCESS_REPLICATION_PORT ,
751
+ "bind_address" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
752
+ "type" : "http" ,
753
+ "resources" : [{"names" : ["replication" ]}],
754
+ }
755
+ ]
732
756
with open (config_path ) as file_stream :
733
757
original_config = yaml .safe_load (file_stream )
734
758
original_listeners = original_config .get ("listeners" )
@@ -769,7 +793,17 @@ def generate_worker_files(
769
793
770
794
# A list of internal endpoints to healthcheck, starting with the main process
771
795
# which exists even if no workers do.
772
- healthcheck_urls = ["http://localhost:8080/health" ]
796
+ # This list ends up being part of the command line to curl, (curl added support for
797
+ # Unix sockets in version 7.40).
798
+ if using_unix_sockets :
799
+ healthcheck_urls = [
800
+ f"--unix-socket { MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH } "
801
+ # The scheme and hostname from the following URL are ignored.
802
+ # The only thing that matters is the path `/health`
803
+ "http://localhost/health"
804
+ ]
805
+ else :
806
+ healthcheck_urls = ["http://localhost:8080/health" ]
773
807
774
808
# Get the set of all worker types that we have configured
775
809
all_worker_types_in_use = set (chain (* requested_worker_types .values ()))
@@ -806,8 +840,12 @@ def generate_worker_files(
806
840
# given worker_type needs to stay assigned and not be replaced.
807
841
worker_config ["shared_extra_conf" ].update (shared_config )
808
842
shared_config = worker_config ["shared_extra_conf" ]
809
-
810
- healthcheck_urls .append ("http://localhost:%d/health" % (worker_port ,))
843
+ if using_unix_sockets :
844
+ healthcheck_urls .append (
845
+ f"--unix-socket /run/worker.{ worker_port } http://localhost/health"
846
+ )
847
+ else :
848
+ healthcheck_urls .append ("http://localhost:%d/health" % (worker_port ,))
811
849
812
850
# Update the shared config with sharding-related options if necessary
813
851
add_worker_roles_to_shared_config (
@@ -826,6 +864,7 @@ def generate_worker_files(
826
864
"/conf/workers/{name}.yaml" .format (name = worker_name ),
827
865
** worker_config ,
828
866
worker_log_config_filepath = log_config_filepath ,
867
+ using_unix_sockets = using_unix_sockets ,
829
868
)
830
869
831
870
# Save this worker's port number to the correct nginx upstreams
@@ -846,8 +885,13 @@ def generate_worker_files(
846
885
nginx_upstream_config = ""
847
886
for upstream_worker_base_name , upstream_worker_ports in nginx_upstreams .items ():
848
887
body = ""
849
- for port in upstream_worker_ports :
850
- body += f" server localhost:{ port } ;\n "
888
+ if using_unix_sockets :
889
+ for port in upstream_worker_ports :
890
+ body += f" server unix:/run/worker.{ port } ;\n "
891
+
892
+ else :
893
+ for port in upstream_worker_ports :
894
+ body += f" server localhost:{ port } ;\n "
851
895
852
896
# Add to the list of configured upstreams
853
897
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK .format (
@@ -877,10 +921,15 @@ def generate_worker_files(
877
921
# If there are workers, add the main process to the instance_map too.
878
922
if workers_in_use :
879
923
instance_map = shared_config .setdefault ("instance_map" , {})
880
- instance_map [MAIN_PROCESS_INSTANCE_NAME ] = {
881
- "host" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
882
- "port" : MAIN_PROCESS_REPLICATION_PORT ,
883
- }
924
+ if using_unix_sockets :
925
+ instance_map [MAIN_PROCESS_INSTANCE_NAME ] = {
926
+ "path" : MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH ,
927
+ }
928
+ else :
929
+ instance_map [MAIN_PROCESS_INSTANCE_NAME ] = {
930
+ "host" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
931
+ "port" : MAIN_PROCESS_REPLICATION_PORT ,
932
+ }
884
933
885
934
# Shared homeserver config
886
935
convert (
@@ -890,6 +939,7 @@ def generate_worker_files(
890
939
appservice_registrations = appservice_registrations ,
891
940
enable_redis = workers_in_use ,
892
941
workers_in_use = workers_in_use ,
942
+ using_unix_sockets = using_unix_sockets ,
893
943
)
894
944
895
945
# Nginx config
@@ -900,6 +950,7 @@ def generate_worker_files(
900
950
upstream_directives = nginx_upstream_config ,
901
951
tls_cert_path = os .environ .get ("SYNAPSE_TLS_CERT" ),
902
952
tls_key_path = os .environ .get ("SYNAPSE_TLS_KEY" ),
953
+ using_unix_sockets = using_unix_sockets ,
903
954
)
904
955
905
956
# Supervisord config
@@ -909,6 +960,7 @@ def generate_worker_files(
909
960
"/etc/supervisor/supervisord.conf" ,
910
961
main_config_path = config_path ,
911
962
enable_redis = workers_in_use ,
963
+ using_unix_sockets = using_unix_sockets ,
912
964
)
913
965
914
966
convert (
0 commit comments