3636from copy import deepcopy
3737from pathlib import Path
3838import time
39- from typing import Dict , Optional , Set
39+ from typing import Dict , Set
4040
4141from cylc .flow .exceptions import WorkflowStopped
4242from cylc .flow .id import Tokens
5656from .utils import fmt_call
5757from .workflows_mgr import workflow_request
5858
59+ MIN_LEVEL = 'min'
60+ MAX_LEVEL = 'max'
61+ SUBSCRIPTION_LEVELS = {
62+ MIN_LEVEL : {
63+ 'topics' : {WORKFLOW .encode ('utf-8' ), b'shutdown' },
64+ 'criteria' : {
65+ 'fragments' : {
66+ 'AddedDelta' ,
67+ 'WorkflowData' ,
68+ 'UpdatedDelta'
69+ },
70+ },
71+ 'request' : 'pb_workflow_only' ,
72+ },
73+ MAX_LEVEL : {
74+ 'topics' : {ALL_DELTAS .encode ('utf-8' ), b'shutdown' },
75+ 'criteria' : {'fragments' : set ()},
76+ 'request' : 'pb_entire_workflow' ,
77+ },
78+ }
79+
5980
6081def log_call (fcn ):
6182 """Decorator for data store methods we want to log."""
@@ -101,22 +122,13 @@ def __init__(self, workflows_mgr, log, max_threads=10):
101122 self .log = log
102123 self .data = {}
103124 self .w_subs : Dict [str , WorkflowSubscriber ] = {}
104- self .topics = {
105- ALL_DELTAS .encode ('utf-8' ),
106- WORKFLOW .encode ('utf-8' ),
107- b'shutdown'
108- }
109- # If fragments in graphql sub for minimal sync
110- self .min_sync_fragments = {
111- 'AddedDelta' ,
112- 'WorkflowData' ,
113- 'UpdatedDelta'
125+ # graphql subscription level
126+ self .sync_level_graphql_subs = {
127+ MIN_LEVEL : set (),
128+ MAX_LEVEL : set ()
114129 }
115- # set of workflows to sync all data
116- self .full_sync_workflows = set ()
117- self .full_sync_gql_subs = set ()
118- # dict of workflow full sync subscriber IDs
119- self .full_sync_workflow_gql_subs = {}
130+ # workflow graphql subscription by level
131+ self .sync_level_workflow_graphql_subs = {}
120132 self .loop = None
121133 self .executor = ThreadPoolExecutor (max_threads )
122134 self .delta_queues = {}
@@ -141,6 +153,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None:
141153 status_msg = self ._get_status_msg (w_id , is_active ),
142154 )
143155
156+ # setup sync subscriber set
157+ self .sync_level_workflow_graphql_subs [w_id ] = {
158+ MIN_LEVEL : set (),
159+ MAX_LEVEL : set ()
160+ }
161+
144162 @log_call
145163 async def unregister_workflow (self , w_id ):
146164 """Remove a workflow from the data store entirely.
@@ -176,8 +194,9 @@ async def connect_workflow(self, w_id, contact_data):
176194
177195 self .delta_queues [w_id ] = {}
178196
179- # setup sync subscriber set
180- self .full_sync_workflow_gql_subs [w_id ] = set ()
197+ level = MIN_LEVEL
198+ if self .sync_level_workflow_graphql_subs [w_id ][MAX_LEVEL ]:
199+ level = MAX_LEVEL
181200
182201 # Might be options other than threads to achieve
183202 # non-blocking subscriptions, but this works.
@@ -186,10 +205,11 @@ async def connect_workflow(self, w_id, contact_data):
186205 w_id ,
187206 contact_data ['name' ],
188207 contact_data [CFF .HOST ],
189- contact_data [CFF .PUBLISH_PORT ]
208+ contact_data [CFF .PUBLISH_PORT ],
209+ SUBSCRIPTION_LEVELS [level ]['topics' ]
190210 )
191211
192- result = await self .workflow_data_update (w_id , minimal = True )
212+ result = await self .workflow_data_update (w_id , level )
193213
194214 if result :
195215 # don't update the contact data until we have successfully updated
@@ -199,13 +219,11 @@ async def connect_workflow(self, w_id, contact_data):
199219 async def workflow_data_update (
200220 self ,
201221 w_id : str ,
202- minimal : Optional [ bool ] = None
222+ level : str ,
203223 ):
204- if minimal is None :
205- minimal = w_id in self .full_sync_workflows
206- successful_updates = await self ._entire_workflow_update (
224+ successful_updates = await self ._workflow_update (
207225 ids = [w_id ],
208- minimal = minimal
226+ req_method = SUBSCRIPTION_LEVELS [ level ][ 'request' ]
209227 )
210228
211229 if w_id not in successful_updates :
@@ -241,9 +259,6 @@ def disconnect_workflow(self, w_id, update_contact=True):
241259 if w_id in self .w_subs :
242260 self .w_subs [w_id ].stop ()
243261 del self .w_subs [w_id ]
244- if w_id in self .full_sync_workflow_gql_subs :
245- del self .full_sync_workflow_gql_subs [w_id ]
246- self .full_sync_workflows .discard (w_id )
247262
248263 def get_workflows (self ):
249264 """Return all workflows the data store is currently tracking.
@@ -273,23 +288,26 @@ def _purge_workflow(self, w_id):
273288 del self .data [w_id ]
274289 if w_id in self .delta_queues :
275290 del self .delta_queues [w_id ]
291+ if w_id in self .sync_level_workflow_graphql_subs :
292+ del self .sync_level_workflow_graphql_subs [w_id ]
276293
277- def _start_subscription (self , w_id , reg , host , port ):
294+ def _start_subscription (self , w_id , reg , host , port , topics ):
278295 """Instantiate and run subscriber data-store sync.
279296
280297 Args:
281298 w_id (str): Workflow external ID.
282299 reg (str): Registered workflow name.
283300 host (str): Hostname of target workflow.
284301 port (int): Port of target workflow.
302+ topics set(str): set of topics to subscribe to.
285303
286304 """
287305 self .w_subs [w_id ] = WorkflowSubscriber (
288306 reg ,
289307 host = host ,
290308 port = port ,
291309 context = self .workflows_mgr .context ,
292- topics = self . topics
310+ topics = topics
293311 )
294312 self .w_subs [w_id ].loop .run_until_complete (
295313 self .w_subs [w_id ].subscribe (
@@ -321,15 +339,15 @@ def _update_workflow_data(self, topic, delta, w_id):
321339 self .disconnect_workflow (w_id )
322340 return
323341 elif topic == WORKFLOW :
324- if w_id in self .full_sync_workflows :
342+ if self .sync_level_workflow_graphql_subs [ w_id ][ MAX_LEVEL ] :
325343 return
326344 self ._apply_delta (w_id , WORKFLOW , delta )
327345 # might seem clunky, but as with contact update, making it look
328346 # like an ALL_DELTA avoids changing the resolver in cylc-flow
329347 all_deltas = DELTAS_MAP [ALL_DELTAS ]()
330348 all_deltas .workflow .CopyFrom (delta )
331349 self ._delta_store_to_queues (w_id , ALL_DELTAS , all_deltas )
332- elif w_id in self . full_sync_workflows :
350+ else :
333351 self ._apply_all_delta (w_id , delta )
334352 self ._delta_store_to_queues (w_id , topic , delta )
335353
@@ -419,22 +437,15 @@ def _reconcile_update(self, topic, delta, w_id):
419437 except Exception as exc :
420438 self .log .exception (exc )
421439
422- async def _entire_workflow_update (
423- self ,
424- ids : Optional [list ] = None ,
425- minimal : Optional [bool ] = False
440+ async def _workflow_update (
441+ self , ids : list [str ], req_method : str ,
426442 ) -> Set [str ]:
427443 """Update entire local data-store of workflow(s).
428444
429445 Args:
430446 ids: List of workflow external IDs.
431447
432448 """
433- if ids is None :
434- ids = []
435-
436- # Request new data
437- req_method = 'pb_entire_workflow'
438449
439450 requests = {
440451 w_id : workflow_request (
@@ -467,8 +478,6 @@ async def _entire_workflow_update(
467478 for key in DATA_TEMPLATE
468479 }
469480 continue
470- elif minimal :
471- continue
472481 new_data [field .name ] = {n .id : n for n in value }
473482 self .data [w_id ] = new_data
474483 successes .add (w_id )
@@ -558,32 +567,63 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str:
558567 # the workflow has not yet run
559568 return 'not yet run'
560569
570+ async def _update_subscription_level (self , w_id , level ):
571+ """Update level of data subscribed to."""
572+ sub = self .w_subs .get (w_id )
573+ if sub :
574+ stop_topics = sub .topics .difference (
575+ SUBSCRIPTION_LEVELS [level ]['topics' ]
576+ )
577+ start_topics = SUBSCRIPTION_LEVELS [level ]['topics' ].difference (
578+ sub .topics
579+ )
580+ for stop_topic in stop_topics :
581+ sub .unsubscribe_topic (stop_topic )
582+ # Doing this after unsubscribe and before subscribe
583+ # to make sure old topics stop and new data is in place.
584+ await self .workflow_data_update (w_id , level )
585+ for start_topic in start_topics :
586+ sub .subscribe_topic (start_topic )
587+
561588 def graphql_sub_interrogate (self , sub_id , info ):
562589 """Scope data requirements."""
563590 fragments = set (info .fragments .keys ())
564591 minimal = (
565- fragments <= self .min_sync_fragments
592+ (
593+ fragments
594+ <= SUBSCRIPTION_LEVELS [MIN_LEVEL ]['criteria' ]['fragments' ]
595+ )
566596 and bool (fragments )
567597 )
568- if not minimal :
569- self .full_sync_gql_subs .add (sub_id )
570- return minimal
598+ if minimal :
599+ self .sync_level_graphql_subs [MIN_LEVEL ].add (sub_id )
600+ return
601+ self .sync_level_graphql_subs [MAX_LEVEL ].add (sub_id )
571602
572603 async def graphql_sub_data_match (self , w_id , sub_id ):
573604 """Match store data level to requested graphql subscription."""
574- if (
575- sub_id in self .full_sync_gql_subs
576- and sub_id not in self .full_sync_workflow_gql_subs [w_id ]
577- ):
578- self .full_sync_workflow_gql_subs [w_id ].add (sub_id )
579- await self .workflow_data_update (w_id , minimal = False )
580-
581- self .full_sync_workflows .add (w_id )
605+ sync_level_wsubs = self .sync_level_workflow_graphql_subs [w_id ]
606+ if sub_id in self .sync_level_graphql_subs [MAX_LEVEL ]:
607+ if not sync_level_wsubs [MAX_LEVEL ]:
608+ sync_level_wsubs [MAX_LEVEL ].add (sub_id )
609+ await self ._update_subscription_level (w_id , MAX_LEVEL )
610+ else :
611+ sync_level_wsubs [MIN_LEVEL ].add (sub_id )
582612
583- def graphql_sub_discard (self , sub_id ):
613+ async def graphql_sub_discard (self , sub_id ):
584614 """Discard graphql subscription references."""
585- self .full_sync_gql_subs .discard (sub_id )
586- for w_id in self .full_sync_workflow_gql_subs :
587- self .full_sync_workflow_gql_subs [w_id ].discard (w_id )
588- if not self .full_sync_workflow_gql_subs [w_id ]:
589- self .full_sync_workflows .discard (w_id )
615+ level = MIN_LEVEL
616+ if sub_id in self .sync_level_graphql_subs [MAX_LEVEL ]:
617+ level = MAX_LEVEL
618+ self .sync_level_graphql_subs [level ].discard (sub_id )
619+ for w_id in self .sync_level_workflow_graphql_subs :
620+ self .sync_level_workflow_graphql_subs [w_id ][level ].discard (
621+ sub_id
622+ )
623+ # if there are no more max level subscriptions after removal
624+ # of a max level sub, downgrade to min.
625+ if (
626+ not self .sync_level_workflow_graphql_subs [w_id ][level ]
627+ and level is MAX_LEVEL
628+ ):
629+ await self ._update_subscription_level (w_id , MIN_LEVEL )
0 commit comments