@@ -16,18 +16,23 @@ def __init__(self, name: str, value: Any):
16
16
17
17
class IObjectSink (ProtocolType ):
18
18
def olink_object_name () -> str :
19
+ # return object name
19
20
raise NotImplementedError ()
20
21
21
22
def olink_on_signal (self , name : str , args : list [Any ]) -> None :
23
+ # called on signal message
22
24
raise NotImplementedError ()
23
25
24
26
def olink_on_property_changed (self , name : str , value : Any ) -> None :
27
+ # called on property changed message
25
28
raise NotImplementedError ()
26
29
27
30
def olink_on_init (self , name : str , props : object , node : "ClientNode" ):
31
+ # called on init message
28
32
raise NotImplementedError ()
29
33
30
34
def olink_on_release (self ) -> None :
35
+ # called when sink is released
31
36
raise NotImplementedError ()
32
37
33
38
class SinkToClientEntry :
@@ -39,72 +44,77 @@ def __init__(self, sink=None):
39
44
40
45
41
46
class ClientRegistry (Base ):
47
+ # client side registry to link sinks to nodes
42
48
entries : dict [str , SinkToClientEntry ] = {}
43
- def attach_client_node (self , node : "ClientNode" ):
44
- pass
45
49
46
- def detach_client_node (self , node : "ClientNode" ):
50
+ def remove_node (self , node : "ClientNode" ):
51
+ # remove node from all sinks
47
52
for entry in self .entries .values ():
48
53
if entry .node is node :
49
54
entry .node = None
50
55
51
- def link_client_node (self , name : str , node : "ClientNode" ):
52
- self .entry (name ).node = node
56
+ def add_node_to_sink (self , name : str , node : "ClientNode" ):
57
+ # add not to named sink
58
+ self ._entry (name ).node = node
53
59
54
- def unlink_client_node (self , name : str , node : "ClientNode" ):
60
+ def remove_node_from_sink (self , name : str , node : "ClientNode" ):
61
+ # remove node from named sink
55
62
resource = Name .resource_from_name (name )
56
63
if resource in self .entries :
57
64
if self .entries [resource ].node is node :
58
65
self .entries [resource ].node = None
59
66
else :
60
67
self .emit_log (LogLevel .DEBUG , f"unlink node failed, not the same node: { resource } " )
61
68
62
- def add_object_sink (self , sink : IObjectSink ) -> "ClientNode" :
69
+ def register_sink (self , sink : IObjectSink ) -> "ClientNode" :
70
+ # register sink using object name
63
71
name = sink .olink_object_name ()
64
- entry = self .entry (name )
72
+ entry = self ._entry (name )
65
73
entry .sink = sink
66
74
return entry .node
67
75
68
- def remove_object_sink (self , sink : IObjectSink ):
76
+ def unregister_sink (self , sink : IObjectSink ):
77
+ # unregister sink using object name
69
78
name = sink .olink_object_name ()
70
- self .remove_entry (name )
79
+ self ._remove_entry (name )
71
80
72
- def get_object_sink (self , name : str ) -> Optional [IObjectSink ]:
73
- return self .entry (name ).sink
81
+ def get_sink (self , name : str ) -> Optional [IObjectSink ]:
82
+ # get sink using name
83
+ return self ._entry (name ).sink
74
84
75
- def get_client_node (self , name : str ) -> Optional ["ClientNode" ]:
76
- return self .entry (name ).node
85
+ def get_node (self , name : str ) -> Optional ["ClientNode" ]:
86
+ # get node using name
87
+ return self ._entry (name ).node
77
88
78
- def entry (self , name : str ) -> SinkToClientEntry :
89
+ def _entry (self , name : str ) -> SinkToClientEntry :
90
+ # get an entry by name
79
91
resource = Name .resource_from_name (name )
80
92
if not resource in self .entries :
81
93
self .emit_log (LogLevel .DEBUG , f"add new resource: { resource } " )
82
94
self .entries [resource ] = SinkToClientEntry ()
83
95
return self .entries [resource ]
84
96
85
- def remove_entry (self , name : str ) -> None :
97
+ def _remove_entry (self , name : str ) -> None :
98
+ # remove an entry by name
86
99
resource = Name .resource_from_name (name )
87
100
del self .entries [resource ]
88
101
89
102
90
-
91
-
103
+ # global client registry
92
104
_registry = ClientRegistry ()
93
105
106
+ def get_client_registry () -> ClientRegistry :
107
+ return _registry
94
108
95
109
class ClientNode (BaseNode ):
96
110
invokes_pending : dict [int , InvokeReplyFunc ] = {}
97
111
requestId = 0
98
112
99
- def __init__ (self ):
100
- super ().__init__ ()
101
- self .registry ().attach_client_node (self )
102
-
103
113
def registry (self ) -> ClientRegistry :
104
- return _registry
114
+ return get_client_registry ()
105
115
106
116
def detach (self ) -> None :
107
- self .registry ().detach_client_node (self )
117
+ self .registry ().remove_node (self )
108
118
109
119
def next_request_id (self ) -> int :
110
120
self .requestId += 1
@@ -118,49 +128,60 @@ def invoke_remote(self, name: str, args: list[Any], func: Optional[InvokeReplyFu
118
128
self .emit_write (Protocol .invoke_message (request_id , name , args ))
119
129
120
130
def set_remote_property (self , name : str , value : Any ) -> None :
131
+ # send remote propertymessage
121
132
self .emit_log (LogLevel .DEBUG , f"ClientNode.set_remote_property: { name } { value } " )
122
133
self .emit_write (Protocol .set_property_message (name , value ))
123
134
124
135
def link_node (self , name : str ):
125
- self .registry ().link_client_node (name , self )
136
+ # register this node to sink
137
+ self .registry ().add_node_to_sink (name , self )
126
138
127
139
def unlink_node (self , name : str ) -> None :
128
- self .registry ().unlink_client_node (name , self )
140
+ # unregister this node from sink
141
+ self .registry ().remove_node_from_sink (name , self )
129
142
130
143
@staticmethod
131
- def add_object_sink (sink : IObjectSink ) -> Optional ["ClientNode" ]:
132
- return _registry .add_object_sink (sink )
144
+ def register_sink (sink : IObjectSink ) -> Optional ["ClientNode" ]:
145
+ # register sink to registry
146
+ return get_client_registry ().register_sink (sink )
133
147
134
148
@staticmethod
135
- def remove_object_sink (sink : IObjectSink ) -> None :
136
- return _registry .remove_object_sink (sink )
149
+ def unregister_sink (sink : IObjectSink ) -> None :
150
+ # unregister sink from registry
151
+ return get_client_registry ().unregister_sink (sink )
137
152
138
- def get_object_sink (self , name : str ) -> Optional [IObjectSink ]:
139
- return self .registry ().get_object_sink (name )
153
+ @staticmethod
154
+ def get_sink (name : str ) -> Optional [IObjectSink ]:
155
+ return get_client_registry ().get_sink (name )
140
156
141
- def link_remote (self , name : str ):
157
+ def link_remote (self , name : str ):
158
+ # register this node from sink and send a link message
142
159
self .emit_log (LogLevel .DEBUG , f"ClientNode.linkRemote: { name } " )
143
- self .registry ().link_client_node (name , self )
160
+ self .registry ().add_node_to_sink (name , self )
144
161
self .emit_write (Protocol .link_message (name ))
145
162
146
163
def unlink_remote (self , name : str ):
164
+ # unlink this node froom sink and send an unlink message
147
165
self .emit_log (LogLevel .DEBUG , f"ClientNode.unlink_remote: { name } " )
148
166
self .emit_write (Protocol .unlink_message (name ))
149
- self .registry ().unlink_client_node (name , self )
167
+ self .registry ().remove_node_from_sink (name , self )
150
168
151
169
def handle_init (self , name : str , props : object ):
170
+ # handle init message from source
152
171
self .emit_log (LogLevel .DEBUG , f"ClientNode.handle_init: { name } " )
153
- sink = self .get_object_sink (name )
172
+ sink = self .get_sink (name )
154
173
if sink :
155
174
sink .olink_on_init (name , props , self )
156
175
157
176
def handle_property_change (self , name : str , value : Any ) -> None :
177
+ # handle property change message from source
158
178
self .emit_log (LogLevel .DEBUG , f"ClientNode.handle_property_change: { name } " )
159
- sink = self .get_object_sink (name )
179
+ sink = self .get_sink (name )
160
180
if sink :
161
181
sink .olink_on_property_changed (name , value )
162
182
163
183
def handle_invoke_reply (self , id : int , name : str , value : Any ) -> None :
184
+ # handle invoke reply message from source
164
185
self .emit_log (LogLevel .DEBUG , f"ClientNode.handle_invoke_reply: { id } { name } { value } " )
165
186
if id in self .invokes_pending :
166
187
func = self .invokes_pending [id ]
@@ -172,12 +193,14 @@ def handle_invoke_reply(self, id: int, name: str, value: Any) -> None:
172
193
self .emit_log (LogLevel .DEBUG , f"no pending invoke: { id } { name } " )
173
194
174
195
def handle_signal (self , name : str , args : list [Any ]) -> None :
196
+ # handle signal message from source
175
197
self .emit_log (LogLevel .DEBUG , f"ClientNode.handle_signal: { name } { args } " )
176
- sink = self .get_object_sink (name )
198
+ sink = self .get_sink (name )
177
199
if sink :
178
200
sink .olink_on_signal (name , args )
179
201
180
202
def handle_error (self , msgType : MsgType , id : int , error : str ):
203
+ # handle error message from source
181
204
self .emit_log (LogLevel .DEBUG , f"ClientNode.handle_error: { msgType } { id } { error } " )
182
205
183
206
0 commit comments