|
5 | 5 | import asyncio
|
6 | 6 | import websockets
|
7 | 7 |
|
8 |
| -class Counter(IObjectSink): |
| 8 | + |
| 9 | +class CounterSink(IObjectSink): |
| 10 | + # this is the sink for the counter |
9 | 11 | count = 0
|
10 | 12 | client = None
|
| 13 | + |
11 | 14 | def __init__(self):
|
| 15 | + # register sink with client node |
12 | 16 | self.client = ClientNode.register_sink(self)
|
13 | 17 | print('client', self.client)
|
14 | 18 |
|
15 | 19 | def increment(self):
|
| 20 | + # remote call the increment method |
16 | 21 | if self.client:
|
17 | 22 | self.client.invoke_remote('demo.Counter/increment', [], None)
|
18 | 23 |
|
19 | 24 | def olink_object_name(self):
|
| 25 | + # return the name of the sink |
20 | 26 | return 'demo.Counter'
|
21 | 27 |
|
22 | 28 | def olink_on_signal(self, name: str, args: list[Any]):
|
| 29 | + # handle the incoming signal from the remote source |
23 | 30 | path = Name.path_from_name(name)
|
24 | 31 | print('on signal: %s: %s' % (path, args))
|
25 | 32 |
|
26 | 33 | def olink_on_property_changed(self, name: str, value: Any) -> None:
|
| 34 | + # handle the property change from the remote source |
27 | 35 | path = Name.path_from_name(name)
|
28 | 36 | print('on property changed: %s: %s' % (path, value))
|
29 | 37 |
|
30 | 38 | def olink_on_init(self, name: str, props: object, node: ClientNode):
|
| 39 | + # handle the initialization of the sink, |
| 40 | + # called when the sink is linked to remote source |
31 | 41 | print('on init: %s: %s' % (name, props))
|
32 | 42 | self.client = node
|
33 |
| - |
| 43 | + |
34 | 44 | def olink_on_release(self):
|
| 45 | + # handle the release of the sink, |
| 46 | + # called when the sink is unlinked from remote source |
35 | 47 | print('on release')
|
36 |
| - |
37 |
| - |
38 |
| -async def sender(ws, q): |
39 |
| - while True: |
40 |
| - data = await q.get() |
41 |
| - await ws.send_text(data) |
42 |
| - q.task_done() |
43 | 48 |
|
44 | 49 |
|
45 |
| -async def connect(address: str): |
46 |
| - node = ClientNode() |
47 |
| - queue = Queue() |
48 |
| - async with websockets.connect(address) as ws: |
49 |
| - def writer(msg: str): |
50 |
| - queue.put_nowait(msg) |
51 |
| - node.on_write(writer) |
52 |
| - while True: |
53 |
| - data = await ws.recv() |
54 |
| - node.handle_message(data) |
55 |
| - |
56 |
| -class Client: |
| 50 | +class ClientWebsocketAdapter: |
| 51 | + # adapts the websocket communication to the client node |
57 | 52 | queue = Queue()
|
58 | 53 | node = None
|
| 54 | + |
59 | 55 | def __init__(self, node):
|
60 | 56 | self.node = node
|
| 57 | + # register a write function |
61 | 58 | self.node.on_write(self.writer)
|
62 | 59 |
|
63 | 60 | def writer(self, data):
|
64 |
| - print('writer, data') |
| 61 | + # don't send directly, first write to queue |
| 62 | + print('write to queue') |
65 | 63 | self.queue.put_nowait(data)
|
66 | 64 |
|
67 | 65 | async def _reader(self, ws):
|
| 66 | + # handle incoming ws messages |
68 | 67 | while True:
|
69 |
| - data = await ws.recv() |
70 |
| - self.node.handle_message(data) |
71 |
| - |
| 68 | + msg = await ws.recv() |
| 69 | + self.node.handle_message(msg) |
| 70 | + |
72 | 71 | async def _sender(self, ws):
|
| 72 | + # send messages from queue |
73 | 73 | while True:
|
74 | 74 | data = await self.queue.get()
|
75 | 75 | await ws.send(data)
|
76 | 76 | self.queue.task_done()
|
77 | 77 |
|
78 | 78 | async def connect(self, address: str):
|
79 |
| - async with websockets.connect(address) as ws: |
80 |
| - print('connected') |
81 |
| - sender_task = asyncio.create_task(self._sender(ws)) |
82 |
| - reader_task = asyncio.create_task(self._reader(ws)) |
83 |
| - await asyncio.gather(sender_task, reader_task) |
84 |
| - await self.queue.join() |
85 |
| - |
| 79 | + # connect to websocket server |
| 80 | + async for ws in websockets.connect(address): |
| 81 | + # connect might fail so loop continuously, for a retry-connection |
| 82 | + # see https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection |
| 83 | + try: |
| 84 | + sender_task = asyncio.create_task(self._sender(ws)) |
| 85 | + reader_task = asyncio.create_task(self._reader(ws)) |
| 86 | + await asyncio.gather(sender_task, reader_task) |
| 87 | + await self.queue.join() |
| 88 | + except Exception as e: |
| 89 | + print('exception while connecting: ', e) |
86 | 90 |
|
87 | 91 |
|
88 | 92 | address = 'ws://localhost:8282/ws'
|
| 93 | +# create a client node for ObjectLink registry and protocol |
89 | 94 | node = ClientNode()
|
| 95 | +# link the node to the service name |
90 | 96 | node.link_node('demo.Counter')
|
91 |
| -client = Client(node) |
92 | 97 |
|
93 |
| -counter = Counter() |
| 98 | +# create a ws client which handles the ws adapter |
| 99 | +client = ClientWebsocketAdapter(node) |
| 100 | + |
| 101 | +counter = CounterSink() |
94 | 102 | node.link_remote('demo.Counter')
|
95 | 103 | counter.increment()
|
96 |
| -asyncio.get_event_loop().run_until_complete(client.connect(address)) |
| 104 | + |
| 105 | + |
| 106 | +async def countForever(): |
| 107 | + # every send increment the counter |
| 108 | + while True: |
| 109 | + await asyncio.sleep(1) |
| 110 | + counter.increment() |
| 111 | + |
| 112 | + |
| 113 | +# await both tasks to complete |
| 114 | +future = asyncio.gather(client.connect(address), countForever()) |
| 115 | +# get the event loop |
| 116 | +loop = asyncio.get_event_loop() |
| 117 | +# run the event loop until future completes |
| 118 | +loop.run_until_complete(future) |
0 commit comments