Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions app/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
class Flow:
"""Send messages through websocket to frontend"""

def __init__(self, socket_path=cfg["ports.websocket_path_in"]):
self._socket_path = socket_path
def __init__(
self,
host=cfg["hosts.message_bus"],
port=cfg["ports.message_bus_receive"],
):
self._endpoint = f"tcp://{host}:{port}"
self._ctx = zmq.Context.instance()
self._bus = self._ctx.socket(zmq.PUSH)
self._bus.connect(self._socket_path)
self._bus.connect(self._endpoint)

def push(self, user_id, action_type, payload={}):
"""Push action to specified user over websocket.
Expand Down
10 changes: 8 additions & 2 deletions config.yaml.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,17 @@ ports:
app_http_proxy: 5001
app_internal: 65000 # nginx forwards this port to ports:app
dask: 63500
websocket_path_in: 'ipc://run/message_flow_in'
websocket_path_out: 'ipc://run/message_flow_out'
message_bus_receive: 64002 # message_proxy binds; the app/workers push here
message_bus_publish: 64003 # message_proxy binds; the websocket server reads here
status: 64500
migration_manager: 64501

hosts:
# Host running the message bus (message_proxy). The app, workers and the
# websocket server connect to it over TCP, so set this to a routable address
# (e.g. a k8s Service name) when running the bus on a separate host/pod.
message_bus: 127.0.0.1

external_logging:
papertrail:
# get an account at https://papertrailapp.com
Expand Down
5 changes: 3 additions & 2 deletions services/message_proxy/message_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

log = make_log("message_proxy")

IN = cfg["ports.websocket_path_in"]
OUT = cfg["ports.websocket_path_out"]
# Bind on all interfaces so producers/consumers on other hosts can connect.
IN = f"tcp://*:{cfg['ports.message_bus_receive']}"
OUT = f"tcp://*:{cfg['ports.message_bus_publish']}"

context = zmq.Context()

Expand Down
4 changes: 3 additions & 1 deletion services/websocket_server/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def broadcast(cls, data):

if __name__ == "__main__":
PORT = cfg["ports.websocket"]
LOCAL_OUTPUT = cfg["ports.websocket_path_out"]
LOCAL_OUTPUT = (
f"tcp://{cfg['hosts.message_bus']}:{cfg['ports.message_bus_publish']}"
)

import zmq
from zmq.eventloop import zmqstream
Expand Down
Loading