Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@
from typing import (
Any,
Dict,
Optional,
Iterable,
List,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Expand Down Expand Up @@ -2705,6 +2706,23 @@ def get_entire_workflow(self):

return workflow_msg

def get_workflow_only(self):
"""Gather workflow summary data into a Protobuf message.

No tasks / cycles, etc, just workflow stuff.

Returns:
cylc.flow.data_messages_pb2.PbEntireWorkflow

"""

workflow_msg = PbEntireWorkflow()
workflow_msg.workflow.CopyFrom(
self.data[self.workflow_id][WORKFLOW]
)

return workflow_msg

def get_publish_deltas(self):
"""Return deltas for publishing."""
all_deltas = DELTAS_MAP[ALL_DELTAS]()
Expand Down Expand Up @@ -2752,3 +2770,25 @@ def edge_id(self, left_tokens: Tokens, right_tokens: Tokens) -> str:
f'$edge|{left_tokens.relative_id}|{right_tokens.relative_id}'
)
).id

# subscription stubs
def graphql_sub_interrogate(self, sub_id, info):
"""Scope data requirements."""
pass

async def graphql_sub_data_match(self, w_id, sub_id):
"""Match store data level to requested graphql subscription."""
pass

async def graphql_sub_discard(self, sub_id):
"""Discard graphql subscription references."""
pass

async def set_query_sync_levels(
self,
w_ids: Iterable[str],
level: Optional[str] = None,
expire_delay: Optional[float] = None,
):
"""Set a workflow sync level."""
pass
134 changes: 102 additions & 32 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,22 +346,36 @@ def __init__(self, data_store_mgr: 'DataStoreMgr'):
async def get_workflow_by_id(self, args):
"""Return a workflow store by ID."""
try:
if 'sub_id' in args and args['delta_store']:
return self.delta_store[args['sub_id']][args['id']][
args['delta_type']][WORKFLOW]
if 'sub_id' in args:
if args['delta_store']:
return self.delta_store[args['sub_id']][args['id']][
args['delta_type']][WORKFLOW]
else:
await self.data_store_mgr.set_query_sync_levels(
[self.data_store_mgr.data[args['id']][WORKFLOW].id]
)
return self.data_store_mgr.data[args['id']][WORKFLOW]
except KeyError:
return None

async def get_workflows_data(self, args: Dict[str, Any]):
"""Return list of data from workflows."""
# Both cases just as common so 'if' not 'try'
if 'sub_id' in args and args['delta_store']:
return [
delta[args['delta_type']]
for key, delta in self.delta_store[args['sub_id']].items()
if workflow_filter(self.data_store_mgr.data[key], args)
]
if 'sub_id' in args:
if args['delta_store']:
return [
delta[args['delta_type']]
for key, delta in self.delta_store[args['sub_id']].items()
if workflow_filter(self.data_store_mgr.data[key], args)
]
else:
await self.data_store_mgr.set_query_sync_levels(
[
workflow[WORKFLOW].id
for workflow in self.data_store_mgr.data.values()
if workflow_filter(workflow, args)
]
)
return [
workflow
for workflow in self.data_store_mgr.data.values()
Expand All @@ -375,6 +389,22 @@ async def get_workflows(self, args):
for flow in await self.get_workflows_data(args)],
args)

async def get_flow_data_from_ids(self, data_store, native_ids):
"""Return workflow data by id."""
w_ids = []
for native_id in native_ids:
w_ids.append(
Tokens(native_id).workflow_id
)
await self.data_store_mgr.set_query_sync_levels(
set(w_ids)
)
return [
data_store[w_id]
for w_id in iter_uniq(w_ids)
if w_id in data_store
]

# nodes
def get_node_state(self, node, node_type):
"""Return state, from node or data-store."""
Expand Down Expand Up @@ -414,14 +444,18 @@ async def get_nodes_by_ids(self, node_type, args):
"""Return protobuf node objects for given id."""
nat_ids = uniq(args.get('native_ids', []))
# Both cases just as common so 'if' not 'try'
if 'sub_id' in args and args['delta_store']:
flow_data = [
delta[args['delta_type']]
for delta in get_flow_data_from_ids(
self.delta_store[args['sub_id']], nat_ids)
]
if 'sub_id' in args:
if args['delta_store']:
flow_data = [
delta[args['delta_type']]
for delta in get_flow_data_from_ids(
self.delta_store[args['sub_id']], nat_ids)
]
else:
flow_data = get_flow_data_from_ids(
self.data_store_mgr.data, nat_ids)
else:
flow_data = get_flow_data_from_ids(
flow_data = await self.get_flow_data_from_ids(
self.data_store_mgr.data, nat_ids)

if node_type == PROXY_NODES:
Expand Down Expand Up @@ -450,10 +484,14 @@ async def get_node_by_id(self, node_type, args):
w_id = Tokens(n_id).workflow_id
# Both cases just as common so 'if' not 'try'
try:
if 'sub_id' in args and args.get('delta_store'):
flow = self.delta_store[
args['sub_id']][w_id][args['delta_type']]
if 'sub_id' in args:
if args.get('delta_store'):
flow = self.delta_store[
args['sub_id']][w_id][args['delta_type']]
else:
flow = self.data_store_mgr.data[w_id]
else:
await self.data_store_mgr.set_query_sync_levels([w_id])
flow = self.data_store_mgr.data[w_id]
except KeyError:
return None
Expand All @@ -475,14 +513,18 @@ async def get_edges_all(self, args):
async def get_edges_by_ids(self, args):
"""Return protobuf edge objects for given id."""
nat_ids = uniq(args.get('native_ids', []))
if 'sub_id' in args and args['delta_store']:
flow_data = [
delta[args['delta_type']]
for delta in get_flow_data_from_ids(
self.delta_store[args['sub_id']], nat_ids)
]
if 'sub_id' in args:
if args['delta_store']:
flow_data = [
delta[args['delta_type']]
for delta in get_flow_data_from_ids(
self.delta_store[args['sub_id']], nat_ids)
]
else:
flow_data = get_flow_data_from_ids(
self.data_store_mgr.data, nat_ids)
else:
flow_data = get_flow_data_from_ids(
flow_data = await self.get_flow_data_from_ids(
self.data_store_mgr.data, nat_ids)

return sort_elements(
Expand All @@ -502,6 +544,7 @@ async def get_nodes_edges(self, root_nodes, args):
edge_ids = set()
# Setup for edgewise search.
new_nodes = root_nodes
is_sub = 'sub_id' in args
for _ in range(args['distance']):
# Gather edges.
# Edges should be unique (graph not circular),
Expand All @@ -512,10 +555,19 @@ async def get_nodes_edges(self, root_nodes, args):
for e_id in n.edges
}.difference(edge_ids)
edge_ids.update(new_edge_ids)
if is_sub:
flow_data = get_flow_data_from_ids(
self.data_store_mgr.data,
new_edge_ids
)
else:
flow_data = await self.get_flow_data_from_ids(
self.data_store_mgr.data,
new_edge_ids
)
new_edges = [
edge
for flow in get_flow_data_from_ids(
self.data_store_mgr.data, new_edge_ids)
for flow in flow_data
for edge in get_data_elements(flow, new_edge_ids, EDGES)
]
edges += new_edges
Expand All @@ -530,10 +582,19 @@ async def get_nodes_edges(self, root_nodes, args):
if not new_node_ids:
break
node_ids.update(new_node_ids)
if is_sub:
flow_data = get_flow_data_from_ids(
self.data_store_mgr.data,
new_node_ids
)
else:
flow_data = await self.get_flow_data_from_ids(
self.data_store_mgr.data,
new_node_ids
)
new_nodes = [
node
for flow in get_flow_data_from_ids(
self.data_store_mgr.data, new_node_ids)
for flow in flow_data
for node in get_data_elements(flow, new_node_ids, TASK_PROXIES)
]
nodes += new_nodes
Expand Down Expand Up @@ -569,6 +630,8 @@ async def subscribe_delta(
delta_queues = self.data_store_mgr.delta_queues
deltas_queue: DeltaQueue = queue.Queue()

self.data_store_mgr.graphql_sub_interrogate(sub_id, info)

counters: Dict[str, int] = {}
delta_yield_queue: DeltaQueue = queue.Queue()
flow_delta_queues: Dict[str, queue.Queue[Tuple[str, dict]]] = {}
Expand All @@ -591,6 +654,9 @@ async def subscribe_delta(
if w_id in self.data_store_mgr.data:
if sub_id not in delta_queues[w_id]:
delta_queues[w_id][sub_id] = deltas_queue
await self.data_store_mgr.graphql_sub_data_match(
w_id, sub_id
)
# On new yield workflow data-store as added delta
if args.get('initial_burst'):
delta_store = create_delta_store(
Expand Down Expand Up @@ -658,6 +724,7 @@ async def subscribe_delta(
import traceback
logger.warning(traceback.format_exc())
finally:
await self.data_store_mgr.graphql_sub_discard(sub_id)
for w_id in w_ids:
if delta_queues.get(w_id, {}).get(sub_id):
del delta_queues[w_id][sub_id]
Expand Down Expand Up @@ -702,8 +769,11 @@ async def mutator(
meta: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Mutate workflow."""
w_ids = [flow[WORKFLOW].id
for flow in await self.get_workflows_data(w_args)]
w_ids = [
workflow[WORKFLOW].id
for workflow in self.data_store_mgr.data.values()
if workflow_filter(workflow, w_args)
]
if not w_ids:
workflows = list(self.data_store_mgr.data.keys())
return [{
Expand Down
11 changes: 11 additions & 0 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
# maps server methods to the protobuf message (for client/UIS import)
PB_METHOD_MAP: Dict[str, Any] = {
'pb_entire_workflow': PbEntireWorkflow,
'pb_workflow_only': PbEntireWorkflow,
'pb_data_elements': DELTAS_MAP
}

Expand Down Expand Up @@ -416,6 +417,16 @@ def pb_entire_workflow(self, **_kwargs) -> bytes:
pb_msg = self.schd.data_store_mgr.get_entire_workflow()
return pb_msg.SerializeToString()

@authorise()
@expose
def pb_workflow_only(self, **_kwargs) -> bytes:
"""Send only the workflow data, not tasks etc.

Returns serialised Protobuf message
"""
pb_msg = self.schd.data_store_mgr.get_workflow_only()
return pb_msg.SerializeToString()

@authorise()
@expose
def pb_data_elements(self, element_type: str, **_kwargs) -> bytes:
Expand Down
10 changes: 10 additions & 0 deletions cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ def _socket_options(self) -> None:
for topic in self.topics:
self.socket.setsockopt(zmq.SUBSCRIBE, topic)

def unsubscribe_topic(self, topic):
if topic in self.topics:
self.socket.setsockopt(zmq.UNSUBSCRIBE, topic)
self.topics.discard(topic)

def subscribe_topic(self, topic):
if topic not in self.topics:
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
self.topics.add(topic)

async def subscribe(self, msg_handler, *args, **kwargs):
"""Subscribe to updates from the provided socket."""
while True:
Expand Down
6 changes: 5 additions & 1 deletion tests/integration/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ async def test_publisher(flow, scheduler, run, one_conf, port_range):
schd.workflow,
host=schd.host,
port=schd.server.pub_port,
topics=[b'workflow']
topics=[b'shutdown']
)

subscriber.unsubscribe_topic(b'shutdown')
subscriber.subscribe_topic(b'workflow')
assert subscriber.topics == {b'workflow'}

async with timeout(2):
# wait for the first delta from the workflow
btopic, msg = await subscriber.socket.recv_multipart()
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ def test_pb_entire_workflow(myflow):
assert data.workflow.id == myflow.id


def test_pb_workflow_only(myflow):
"""Test Protobuf workflow only endpoint method."""
data = PB_METHOD_MAP['pb_workflow_only']()
data.ParseFromString(
call_server_method(
myflow.server.pb_workflow_only
)
)
assert data.workflow.id == myflow.id


async def test_stop(one: Scheduler, start):
"""Test stop."""
async with start(one):
Expand Down