Skip to content

Commit fda1d6b

Browse files
committed
add working example for app, server
1 parent c72b046 commit fda1d6b

File tree

4 files changed

+192
-2
lines changed

4 files changed

+192
-2
lines changed

examples/app.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from asyncio.queues import Queue
2+
from typing import Any
3+
from olink.core.types import Name
4+
from olink.clientnode import IObjectSink, ClientNode
5+
import asyncio
6+
import websockets
7+
8+
class Counter(IObjectSink):
9+
count = 0
10+
client = None
11+
def __init__(self):
12+
self.client = ClientNode.add_object_sink(self)
13+
print('client', self.client)
14+
15+
def increment(self):
16+
if self.client:
17+
self.client.invoke_remote('demo.Counter/increment', [], None)
18+
19+
def olink_object_name(self):
20+
return 'demo.Counter'
21+
22+
def olink_on_signal(self, name: str, args: list[Any]):
23+
path = Name.path_from_name(name)
24+
print('on signal: %s: %s' % (path, args))
25+
26+
def olink_on_property_changed(self, name: str, value: Any) -> None:
27+
path = Name.path_from_name(name)
28+
print('on property changed: %s: %s' % (path, value))
29+
30+
def olink_on_init(self, name: str, props: object, node: ClientNode):
31+
print('on init: %s: %s' % (name, props))
32+
self.client = node
33+
34+
def olink_on_release(self):
35+
print('on release')
36+
37+
38+
async def sender(ws, q):
39+
while True:
40+
data = await q.get()
41+
await ws.send_text(data)
42+
q.task_done()
43+
44+
45+
async def connect(address: str):
46+
node = ClientNode()
47+
queue = Queue()
48+
async with websockets.connect(address) as ws:
49+
def writer(msg: str):
50+
queue.put_nowait(msg)
51+
node.on_write(writer)
52+
while True:
53+
data = await ws.recv()
54+
node.handle_message(data)
55+
56+
class Client:
57+
queue = Queue()
58+
node = None
59+
def __init__(self, node):
60+
self.node = node
61+
self.node.on_write(self.writer)
62+
63+
def writer(self, data):
64+
print('writer, data')
65+
self.queue.put_nowait(data)
66+
67+
async def _reader(self, ws):
68+
while True:
69+
data = await ws.recv()
70+
self.node.handle_message(data)
71+
72+
async def _sender(self, ws):
73+
while True:
74+
data = await self.queue.get()
75+
await ws.send(data)
76+
self.queue.task_done()
77+
78+
async def connect(self, address: str):
79+
async with websockets.connect(address) as ws:
80+
print('connected')
81+
sender_task = asyncio.create_task(self._sender(ws))
82+
reader_task = asyncio.create_task(self._reader(ws))
83+
await asyncio.gather(sender_task, reader_task)
84+
await self.queue.join()
85+
86+
87+
88+
address = 'ws://localhost:8282/ws'
89+
node = ClientNode()
90+
node.link_node('demo.Counter')
91+
client = Client(node)
92+
93+
counter = Counter()
94+
node.link_remote('demo.Counter')
95+
counter.increment()
96+
asyncio.get_event_loop().run_until_complete(client.connect(address))

examples/server.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import asyncio
2+
from typing import Any
3+
from asyncio.queues import Queue
4+
from starlette.applications import Starlette
5+
from starlette.endpoints import WebSocketEndpoint
6+
from starlette.routing import WebSocketRoute
7+
from starlette.websockets import WebSocket
8+
9+
from olink.core.types import Name
10+
from olink.net.server import RemoteEndpoint
11+
from olink.remotenode import IObjectSource, RemoteNode
12+
13+
14+
class Counter:
15+
count = 0
16+
_node: RemoteNode
17+
def increment(self):
18+
self.count += 1
19+
self._node.notify_property_change('demo.Counter/count', self.count)
20+
21+
class CounterAdapter(IObjectSource):
22+
node: RemoteNode = None
23+
def __init__(self, impl):
24+
self.impl = impl
25+
RemoteNode.add_object_source(self)
26+
27+
def olink_object_name(self):
28+
return 'demo.Counter'
29+
30+
def olink_invoke(self, name: str, args: list[Any]) -> Any:
31+
path = Name.path_from_name(name)
32+
func = getattr(self.impl, path)
33+
func()
34+
35+
def olink_set_property(self, name: str, value: Any):
36+
path = Name.path_from_name(name)
37+
setattr(self, self.impl, value)
38+
39+
def olink_linked(self, name: str, node: "RemoteNode"):
40+
self.impl._node = node
41+
42+
def olink_collect_properties(self) -> object:
43+
return {k: getattr(self.impl, k) for k in ['count']}
44+
45+
counter = Counter()
46+
adapter = CounterAdapter(counter)
47+
48+
49+
50+
51+
class RemoteEndpoint(WebSocketEndpoint):
52+
encoding = "text"
53+
node = RemoteNode()
54+
queue = Queue()
55+
56+
async def sender(self, ws):
57+
print('start sender')
58+
while True:
59+
print('001')
60+
msg = await self.queue.get()
61+
print('send', msg)
62+
await ws.send_text(msg)
63+
self.queue.task_done()
64+
65+
async def on_connect(self, ws: WebSocket):
66+
print('on_connect')
67+
asyncio.create_task(self.sender(ws))
68+
69+
def writer(msg: str):
70+
print('writer', msg)
71+
self.queue.put_nowait(msg)
72+
self.node.on_write(writer)
73+
await super().on_connect(ws)
74+
75+
76+
async def on_receive(self, ws: WebSocket, data: Any) -> None:
77+
print('on_receive', data)
78+
self.node.handle_message(data)
79+
80+
async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
81+
await super().on_disconnect(websocket, close_code)
82+
self.node.on_write(None)
83+
await self.queue.join()
84+
85+
86+
87+
routes = [
88+
WebSocketRoute("/ws", RemoteEndpoint)
89+
]
90+
91+
app = Starlette(routes=routes)

olink/clientnode.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def detach_client_node(self, node: "ClientNode"):
4848
if entry.node is node:
4949
entry.node = None
5050

51-
def link_client_node(self, name: str, node: "ClientNode"):
51+
def link_client_node(self, name: str, node: "ClientNode"):
5252
self.entry(name).node = node
5353

5454
def unlink_client_node(self, name: str, node: "ClientNode"):
@@ -61,7 +61,9 @@ def unlink_client_node(self, name: str, node: "ClientNode"):
6161

6262
def add_object_sink(self, sink: IObjectSink) -> "ClientNode":
6363
name = sink.olink_object_name()
64-
self.entry(name).sink = sink
64+
entry = self.entry(name)
65+
entry.sink = sink
66+
return entry.node
6567

6668
def remove_object_sink(self, sink: IObjectSink):
6769
name = sink.olink_object_name()

olink/core/node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def emit_write(self, msg: list[Any]) -> None:
2121
self.write_func(data)
2222
else:
2323
self.emit_log(LogLevel.DEBUG, f"write not set on protocol: {msg}")
24+
2425
def handle_message(self, data: str) -> None:
2526
msg = self.converter.from_string(data)
2627
self.protocol.handle_message(msg)

0 commit comments

Comments
 (0)