From 885c07bc0596cafa6b5f4fdff17d6ae41414b544 Mon Sep 17 00:00:00 2001 From: Michael Coughlin Date: Thu, 4 Jun 2026 17:38:09 -0500 Subject: [PATCH 1/2] Allow the websocket message bus to bind a tcp:// address --- config.yaml.defaults | 2 ++ services/message_proxy/message_proxy.py | 23 ++++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/config.yaml.defaults b/config.yaml.defaults index 2f692c6e..d84c35b7 100644 --- a/config.yaml.defaults +++ b/config.yaml.defaults @@ -134,6 +134,8 @@ ports: app_http_proxy: 5001 app_internal: 65000 # nginx forwards this port to ports:app dask: 63500 + # ipc:// requires producers, the websocket server and message_proxy to share a host. + # For multi-host/k8s, use a routable address, e.g. tcp://message_proxy:64002. websocket_path_in: 'ipc://run/message_flow_in' websocket_path_out: 'ipc://run/message_flow_out' status: 64500 diff --git a/services/message_proxy/message_proxy.py b/services/message_proxy/message_proxy.py index 3969b55f..e2fd6eeb 100644 --- a/services/message_proxy/message_proxy.py +++ b/services/message_proxy/message_proxy.py @@ -12,13 +12,30 @@ IN = cfg["ports.websocket_path_in"] OUT = cfg["ports.websocket_path_out"] + +def bind_endpoint(endpoint): + """Convert a ZMQ *connect* endpoint into the matching *bind* endpoint. + + ipc:// binds and connects to the same path (returned unchanged). tcp:// + must bind all interfaces, so the host is replaced with the wildcard ``*`` + while the port is kept (tcp://message_proxy:64002 -> tcp://*:64002). + """ + if endpoint.startswith("tcp://"): + port = endpoint.rsplit(":", 1)[-1] + return f"tcp://*:{port}" + return endpoint + + +IN_BIND = bind_endpoint(IN) +OUT_BIND = bind_endpoint(OUT) + context = zmq.Context() feed_in = context.socket(zmq.PULL) -feed_in.bind(IN) +feed_in.bind(IN_BIND) feed_out = context.socket(zmq.PUB) -feed_out.bind(OUT) +feed_out.bind(OUT_BIND) -log(f"Forwarding messages between {IN} and {OUT}") +log(f"Forwarding messages between {IN_BIND} and {OUT_BIND}") zmq.proxy(feed_in, feed_out) From da11c9af5287f677085736314f42e9deb244d396 Mon Sep 17 00:00:00 2001 From: Michael Coughlin Date: Fri, 5 Jun 2026 05:42:44 -0500 Subject: [PATCH 2/2] Stefan feedback --- app/flow.py | 10 +++++-- config.yaml.defaults | 12 +++++--- services/message_proxy/message_proxy.py | 28 ++++--------------- services/websocket_server/websocket_server.py | 4 ++- 4 files changed, 24 insertions(+), 30 deletions(-) diff --git a/app/flow.py b/app/flow.py index 2c9f28ad..2bfabc8d 100644 --- a/app/flow.py +++ b/app/flow.py @@ -11,11 +11,15 @@ class Flow: """Send messages through websocket to frontend""" - def __init__(self, socket_path=cfg["ports.websocket_path_in"]): - self._socket_path = socket_path + def __init__( + self, + host=cfg["hosts.message_bus"], + port=cfg["ports.message_bus_receive"], + ): + self._endpoint = f"tcp://{host}:{port}" self._ctx = zmq.Context.instance() self._bus = self._ctx.socket(zmq.PUSH) - self._bus.connect(self._socket_path) + self._bus.connect(self._endpoint) def push(self, user_id, action_type, payload={}): """Push action to specified user over websocket. diff --git a/config.yaml.defaults b/config.yaml.defaults index d84c35b7..d40371f9 100644 --- a/config.yaml.defaults +++ b/config.yaml.defaults @@ -134,13 +134,17 @@ ports: app_http_proxy: 5001 app_internal: 65000 # nginx forwards this port to ports:app dask: 63500 - # ipc:// requires producers, the websocket server and message_proxy to share a host. - # For multi-host/k8s, use a routable address, e.g. tcp://message_proxy:64002. - websocket_path_in: 'ipc://run/message_flow_in' - websocket_path_out: 'ipc://run/message_flow_out' + message_bus_receive: 64002 # message_proxy binds; the app/workers push here + message_bus_publish: 64003 # message_proxy binds; the websocket server reads here status: 64500 migration_manager: 64501 +hosts: + # Host running the message bus (message_proxy). The app, workers and the + # websocket server connect to it over TCP, so set this to a routable address + # (e.g. a k8s Service name) when running the bus on a separate host/pod. + message_bus: 127.0.0.1 + external_logging: papertrail: # get an account at https://papertrailapp.com diff --git a/services/message_proxy/message_proxy.py b/services/message_proxy/message_proxy.py index e2fd6eeb..5847d2e1 100644 --- a/services/message_proxy/message_proxy.py +++ b/services/message_proxy/message_proxy.py @@ -9,33 +9,17 @@ log = make_log("message_proxy") -IN = cfg["ports.websocket_path_in"] -OUT = cfg["ports.websocket_path_out"] - - -def bind_endpoint(endpoint): - """Convert a ZMQ *connect* endpoint into the matching *bind* endpoint. - - ipc:// binds and connects to the same path (returned unchanged). tcp:// - must bind all interfaces, so the host is replaced with the wildcard ``*`` - while the port is kept (tcp://message_proxy:64002 -> tcp://*:64002). - """ - if endpoint.startswith("tcp://"): - port = endpoint.rsplit(":", 1)[-1] - return f"tcp://*:{port}" - return endpoint - - -IN_BIND = bind_endpoint(IN) -OUT_BIND = bind_endpoint(OUT) +# Bind on all interfaces so producers/consumers on other hosts can connect. +IN = f"tcp://*:{cfg['ports.message_bus_receive']}" +OUT = f"tcp://*:{cfg['ports.message_bus_publish']}" context = zmq.Context() feed_in = context.socket(zmq.PULL) -feed_in.bind(IN_BIND) +feed_in.bind(IN) feed_out = context.socket(zmq.PUB) -feed_out.bind(OUT_BIND) +feed_out.bind(OUT) -log(f"Forwarding messages between {IN_BIND} and {OUT_BIND}") +log(f"Forwarding messages between {IN} and {OUT}") zmq.proxy(feed_in, feed_out) diff --git a/services/websocket_server/websocket_server.py b/services/websocket_server/websocket_server.py index 9895b117..cdcd4cd7 100644 --- a/services/websocket_server/websocket_server.py +++ b/services/websocket_server/websocket_server.py @@ -168,7 +168,9 @@ def broadcast(cls, data): if __name__ == "__main__": PORT = cfg["ports.websocket"] - LOCAL_OUTPUT = cfg["ports.websocket_path_out"] + LOCAL_OUTPUT = ( + f"tcp://{cfg['hosts.message_bus']}:{cfg['ports.message_bus_publish']}" + ) import zmq from zmq.eventloop import zmqstream