diff --git a/app/flow.py b/app/flow.py index 2c9f28ad..73ae3d04 100644 --- a/app/flow.py +++ b/app/flow.py @@ -9,13 +9,24 @@ class Flow: - """Send messages through websocket to frontend""" - - def __init__(self, socket_path=cfg["ports.websocket_path_in"]): - self._socket_path = socket_path + """Send messages through websocket to frontend + + Parameters + ---------- + host : str + Host on which the message proxy is running. + port : str + Port on which the message bus is listening for messages. + """ + def __init__( + self, + host=cfg["hosts.message_bus"], + port=cfg["ports.message_bus_receive"], + ): + self._endpoint = f"tcp://{host}:{port}" self._ctx = zmq.Context.instance() self._bus = self._ctx.socket(zmq.PUSH) - self._bus.connect(self._socket_path) + self._bus.connect(self._endpoint) def push(self, user_id, action_type, payload={}): """Push action to specified user over websocket. diff --git a/config.yaml.defaults b/config.yaml.defaults index 2f692c6e..f421afa9 100644 --- a/config.yaml.defaults +++ b/config.yaml.defaults @@ -134,11 +134,17 @@ ports: app_http_proxy: 5001 app_internal: 65000 # nginx forwards this port to ports:app dask: 63500 - websocket_path_in: 'ipc://run/message_flow_in' - websocket_path_out: 'ipc://run/message_flow_out' + message_bus_receive: 64002 # bus listens for incoming messages here; app/clients push to this port + message_bus_publish: 64003 # bus sends out messages here; websocket server reads from this port status: 64500 migration_manager: 64501 +hosts: + # Host running the message bus (message_proxy). The app, workers and the + # websocket server connect to it over TCP, so set this to a routable address + # (e.g. a k8s Service name) when running the bus on a separate host/pod. + message_bus: 127.0.0.1 + external_logging: papertrail: # get an account at https://papertrailapp.com diff --git a/services/message_proxy/message_proxy.py b/services/message_proxy/message_proxy.py index 3969b55f..35742174 100644 --- a/services/message_proxy/message_proxy.py +++ b/services/message_proxy/message_proxy.py @@ -9,8 +9,9 @@ log = make_log("message_proxy") -IN = cfg["ports.websocket_path_in"] -OUT = cfg["ports.websocket_path_out"] +# Bind on all interfaces so producers/consumers on other hosts can connect +IN = f"tcp://*:{cfg['ports.message_bus_receive']}" +OUT = f"tcp://*:{cfg['ports.message_bus_publish']}" context = zmq.Context() diff --git a/services/websocket_server/websocket_server.py b/services/websocket_server/websocket_server.py index 9895b117..cdcd4cd7 100644 --- a/services/websocket_server/websocket_server.py +++ b/services/websocket_server/websocket_server.py @@ -168,7 +168,9 @@ def broadcast(cls, data): if __name__ == "__main__": PORT = cfg["ports.websocket"] - LOCAL_OUTPUT = cfg["ports.websocket_path_out"] + LOCAL_OUTPUT = ( + f"tcp://{cfg['hosts.message_bus']}:{cfg['ports.message_bus_publish']}" + ) import zmq from zmq.eventloop import zmqstream