Skip to content

Commit cd88044

Browse files
authored
Merge pull request #186 from fair-workflows/issue185_stepretroprov_workflowretroprov
Issue185 stepretroprov workflowretroprov
2 parents ec06b56 + 0d8f1ad commit cd88044

File tree

9 files changed

+724
-417
lines changed

9 files changed

+724
-417
lines changed

CITATION.cff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ keywords:
2727
license: "Apache-2.0"
2828
message: "If you use this software, please cite it using these metadata."
2929
title: "fairworkflows: A python library for handling semantically described scientific workflows"
30-
version: "0.2.4"
30+
version: "0.2.5"

examples/noodles_fw.ipynb

Lines changed: 538 additions & 381 deletions
Large diffs are not rendered by default.

fairworkflows/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.2.4"
1+
__version__ = "0.2.5"

fairworkflows/fairstep.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from copy import deepcopy
66
from typing import Callable, get_type_hints, List, Union
77
from urllib.parse import urldefrag
8+
from datetime import datetime
89

910
import noodles
1011
import rdflib
@@ -38,13 +39,23 @@ class FairVariable:
3839
become mapped to e.g. XSD types.
3940
semantic_types: One or more URIs that describe the semantic type(s) of this FairVariable.
4041
"""
41-
def __init__(self, name: str = None, computational_type: str = None, semantic_types = None, uri: str = None):
42+
def __init__(self, name: str = None, computational_type: str = None, semantic_types = None, uri: str = None, stepuri: str = None):
4243
if uri and name is None:
4344
# Get the name from the uri (i.e. 'input1' from http://example.org#input1)
4445
_, name = urldefrag(uri)
46+
47+
if name is None and uri is None:
48+
raise ValueError('Both name and uri cannot both be None when constructing a FairVariable.')
49+
4550
self.name = name
4651
self.computational_type = computational_type
4752

53+
if uri:
54+
self._uri = rdflib.URIRef(uri)
55+
else:
56+
step_base_uri, _ = urldefrag(stepuri)
57+
self._uri = rdflib.Namespace(step_base_uri)['#' + name]
58+
4859
if semantic_types is None:
4960
self.semantic_types = []
5061
else:
@@ -53,14 +64,20 @@ def __init__(self, name: str = None, computational_type: str = None, semantic_ty
5364
else:
5465
self.semantic_types = [rdflib.URIRef(t) for t in semantic_types]
5566

67+
@property
68+
def uri(self):
69+
if self._uri:
70+
return self._uri
71+
5672
def __eq__(self, other):
5773
return self.name == other.name and self.computational_type == other.computational_type
5874

5975
def __hash__(self):
6076
return hash(str(self))
6177

6278
def __str__(self):
63-
return f'FairVariable {self.name} of computational type: {self.computational_type} and semantic types: {self.semantic_types}'
79+
return (f'FairVariable {self.name} of computational type: {self.computational_type},'
80+
f'semantic types: {self.semantic_types}.\nHas URI {self.uri}.\n')
6481

6582

6683
class FairStep(RdfWrapper):
@@ -237,6 +254,10 @@ def is_script_task(self, value: bool):
237254

238255
def _get_variable(self, var_ref: Union[rdflib.term.BNode, rdflib.URIRef]) -> FairVariable:
239256
"""Retrieve a specific FairVariable from the RDF triples."""
257+
258+
if var_ref is None:
259+
raise ValueError('Variable reference var_ref cannot be None.')
260+
240261
rdfs_comment_objs = list(self._rdf.objects(var_ref, RDFS.comment))
241262
computational_type = str(rdfs_comment_objs[0])
242263

@@ -245,9 +266,9 @@ def _get_variable(self, var_ref: Union[rdflib.term.BNode, rdflib.URIRef]) -> Fai
245266
sem_types = [sem_type for sem_type in sem_type_objs if sem_type != namespaces.PPLAN.Variable]
246267

247268
if isinstance(var_ref, rdflib.term.BNode):
248-
return FairVariable(name=str(var_ref), computational_type=computational_type, semantic_types=sem_types)
269+
return FairVariable(name=str(var_ref), computational_type=computational_type, semantic_types=sem_types, stepuri=self._uri)
249270
else:
250-
return FairVariable(uri=str(var_ref), computational_type=computational_type, semantic_types=sem_types)
271+
return FairVariable(uri=str(var_ref), computational_type=computational_type, semantic_types=sem_types, stepuri=self._uri)
251272

252273
def _add_variable(self, variable: FairVariable, relation_to_step):
253274
"""Add triples describing FairVariable to rdf."""
@@ -360,12 +381,15 @@ def publish_as_nanopub(self, use_test_server=False, **kwargs):
360381
newvalue=rdflib.URIRef(self.uri))
361382

362383
# Similarly replace old URIs for variable name bindings
384+
# in both this step and any workflow objects that use it.
363385
published_step_uri_defrag, _ = urldefrag(self.uri)
364386
for var_name in var_names:
365387
old_var_uri = old_uri + '#' + var_name
366388
new_var_uri = published_step_uri_defrag + '#' + var_name
367389
replace_in_rdf(self.rdf, oldvalue=rdflib.URIRef(old_var_uri),
368390
newvalue=rdflib.URIRef(new_var_uri))
391+
replace_in_rdf(workflow.rdf, oldvalue=rdflib.URIRef(old_var_uri),
392+
newvalue=rdflib.URIRef(new_var_uri))
369393
del workflow._steps[old_uri]
370394
workflow._steps[self.uri] = self
371395

@@ -442,8 +466,21 @@ def _modify_function(func):
442466
def _add_logging(func):
443467
@functools.wraps(func)
444468
def _wrapper(*func_args, **func_kwargs):
445-
prov_logger.add(StepRetroProv(step=fairstep))
446-
return func(*func_args, **func_kwargs)
469+
470+
# Get the arg label/value pairs as a dict (for both args and kwargs)
471+
func_args_dict = dict(zip(inspect.getfullargspec(func).args, func_args))
472+
all_args = {**func_args_dict, **func_kwargs}
473+
474+
# Execute step (with timing)
475+
t0 = datetime.now()
476+
execution_result = func(*func_args, **func_kwargs)
477+
t1 = datetime.now()
478+
479+
# Log step execution
480+
prov_logger.add(StepRetroProv(step=fairstep, step_args=all_args, output=execution_result, time_start=t0, time_end=t1))
481+
482+
return execution_result
483+
447484
return _wrapper
448485
func._fairstep = fairstep
449486
return noodles.schedule(_add_logging(func))

fairworkflows/prov.py

Lines changed: 97 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import threading
22
from datetime import datetime
3-
from typing import List, Iterator
3+
from typing import List, Iterator, Dict
44

55
import rdflib
66

@@ -29,46 +29,102 @@ def empty(self):
2929
prov_logger = ProvLogger()
3030

3131

32-
class RetroProv(RdfWrapper):
33-
def __init__(self):
34-
super().__init__(uri=None, ref_name='retroprov')
35-
self.timestamp = datetime.now()
36-
37-
38-
class StepRetroProv(RetroProv):
39-
def __init__(self, step):
40-
super().__init__()
32+
class StepRetroProv(RdfWrapper):
33+
def __init__(self, step=None, step_args:Dict = None, time_start:datetime = None, time_end:datetime = None, output=None):
34+
super().__init__(uri=None, ref_name='fairstepprov')
4135
self.set_attribute(rdflib.RDF.type, namespaces.PPLAN.Activity)
4236
self.step = step
4337
self.step_uri = step.uri
4438

39+
stepbase = rdflib.Namespace(step.uri)
40+
41+
# Bind inputs
42+
for inputvar in step.inputs:
43+
if inputvar.name in step_args:
44+
retrovar = rdflib.BNode(inputvar.name)
45+
self._rdf.add( (self.self_ref, namespaces.PROV.used, retrovar) )
46+
self._rdf.add( (retrovar, rdflib.RDF.type, namespaces.PPLAN.Entity) )
47+
self._rdf.add( (retrovar, rdflib.RDFS.label, rdflib.Literal(inputvar.name)) )
48+
self._rdf.add( (retrovar, rdflib.RDF.value, rdflib.Literal(step_args[inputvar.name])) )
49+
50+
if inputvar.uri:
51+
self._rdf.add( (retrovar, namespaces.PPLAN.correspondsToVariable, inputvar.uri) )
52+
53+
# Bind outputs
54+
num_outputs = len(list(step.outputs))
55+
if num_outputs == 1:
56+
outvardict = {'out1': output}
57+
else:
58+
outvardict = {('out' + str(i)): outval for i, outval in enumerate(output) }
59+
60+
for outputvar in step.outputs:
61+
retrovar = rdflib.BNode(outputvar.name)
62+
if outputvar.name in outvardict:
63+
self._rdf.add( (self.self_ref, namespaces.PROV.used, retrovar) )
64+
self._rdf.add( (retrovar, rdflib.RDF.type, namespaces.PPLAN.Entity) )
65+
self._rdf.add( (retrovar, rdflib.RDFS.label, rdflib.Literal(outputvar.name)) )
66+
self._rdf.add( (retrovar, rdflib.RDF.value, rdflib.Literal(outvardict[outputvar.name])) )
67+
68+
if outputvar.uri:
69+
self._rdf.add( (retrovar, namespaces.PPLAN.correspondsToVariable, outputvar.uri) )
70+
71+
# Add times to RDF (if available)
72+
if time_start:
73+
self.set_attribute(namespaces.PROV.startedAtTime, rdflib.Literal(time_start, datatype=rdflib.XSD.dateTime))
74+
if time_end:
75+
self.set_attribute(namespaces.PROV.endedAtTime, rdflib.Literal(time_end, datatype=rdflib.XSD.dateTime))
76+
4577
@property
4678
def step_uri(self):
4779
"""Refers to URI of step associated to this provenance.
4880
49-
Matches the predicate prov:wasDerivedFrom associated to this retrospective provenance
81+
Matches the predicate pplan:correspondsToStep associated to this retrospective provenance
5082
"""
51-
return self.get_attribute(namespaces.PROV.wasDerivedFrom)
83+
return self.get_attribute(namespaces.PPLAN.correspondsToStep)
5284

5385
@step_uri.setter
5486
def step_uri(self, value):
5587
self.set_attribute(namespaces.PPLAN.correspondsToStep, rdflib.URIRef(value), overwrite=True)
5688

89+
def publish_as_nanopub(self, use_test_server=False, **kwargs):
90+
"""
91+
Publish this rdf as a nanopublication.
92+
93+
Args:
94+
use_test_server (bool): Toggle using the test nanopub server.
95+
kwargs: Keyword arguments to be passed to [nanopub.Publication.from_assertion](
96+
https://nanopub.readthedocs.io/en/latest/reference/publication.html#
97+
nanopub.publication.Publication.from_assertion).
98+
This allows for more control over the nanopublication RDF.
99+
100+
Returns:
101+
a dictionary with publication info, including 'nanopub_uri', and 'concept_uri'
102+
"""
103+
return self._publish_as_nanopub(use_test_server=use_test_server, **kwargs)
104+
57105
def __str__(self):
58106
"""String representation."""
59107
s = f'Step retrospective provenance.\n'
60-
s += self._rdf.serialize(format='trig').decode('utf-8')
108+
s += self._rdf.serialize(format='turtle').decode('utf-8')
61109
return s
62110

63111

64-
class WorkflowRetroProv(RetroProv):
112+
class WorkflowRetroProv(RdfWrapper):
65113
def __init__(self, workflow, workflow_uri, step_provs: List[StepRetroProv]):
66-
super().__init__()
67-
self.set_attribute(rdflib.RDF.type, namespaces.PPLAN.Bundle)
114+
super().__init__(uri=None, ref_name='fairworkflowprov')
115+
self._rdf.add((self.self_ref, rdflib.RDF.type, namespaces.PPLAN.Bundle))
116+
self._rdf.add((self.self_ref, rdflib.RDF.type, namespaces.PROV.Collection))
68117
self.workflow = workflow
69118
self.workflow_uri = workflow_uri
70119
self._step_provs = step_provs
71120

121+
# Add the Entity links for now (dummy links, if unpublished)
122+
for stepprov in self._step_provs:
123+
if stepprov.uri:
124+
self._rdf.add((self.self_ref, namespaces.PROV.hasMember, rdflib.URIRef(stepprov.uri)))
125+
else:
126+
self._rdf.add((self.self_ref, namespaces.PROV.hasMember, rdflib.URIRef('http://www.example.org/unpublished-entity-' + str(hash(stepprov)))))
127+
72128
@property
73129
def workflow_uri(self):
74130
"""Refers to URI of step associated to this provenance.
@@ -88,8 +144,32 @@ def __iter__(self) -> Iterator[StepRetroProv]:
88144
def __len__(self) -> int:
89145
return len(self._step_provs)
90146

147+
def publish_as_nanopub(self, use_test_server=False, **kwargs):
148+
"""
149+
Publish this rdf as a nanopublication.
150+
151+
Args:
152+
use_test_server (bool): Toggle using the test nanopub server.
153+
kwargs: Keyword arguments to be passed to [nanopub.Publication.from_assertion](
154+
https://nanopub.readthedocs.io/en/latest/reference/publication.html#
155+
nanopub.publication.Publication.from_assertion).
156+
This allows for more control over the nanopublication RDF.
157+
158+
Returns:
159+
a dictionary with publication info, including 'nanopub_uri', and 'concept_uri'
160+
"""
161+
162+
# Clear existing members of this entity (to be replaced with newly published links)
163+
self.remove_attribute(namespaces.PROV.hasMember)
164+
165+
for stepprov in self._step_provs:
166+
stepprov.publish_as_nanopub(use_test_server=use_test_server, **kwargs)
167+
self._rdf.add((self.self_ref, namespaces.PROV.hasMember, rdflib.URIRef(stepprov.uri)))
168+
169+
return self._publish_as_nanopub(use_test_server=use_test_server, **kwargs)
170+
91171
def __str__(self):
92172
"""String representation."""
93173
s = f'Workflow retrospective provenance.\n'
94-
s += self._rdf.serialize(format='trig').decode('utf-8')
174+
s += self._rdf.serialize(format='turtle').decode('utf-8')
95175
return s

fairworkflows/rdf_wrapper.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ class RdfWrapper:
1919
def __init__(self, uri, ref_name='fairobject', derived_from: List[str] = None,
2020
language: LinguisticSystem = None ):
2121
self._rdf = rdflib.Graph()
22-
self._uri = str(uri)
22+
if uri:
23+
self._uri = str(uri)
24+
else:
25+
self._uri = None
2326
self.self_ref = rdflib.term.BNode(ref_name)
2427
self._is_modified = False
2528
self._is_published = False
@@ -30,7 +33,6 @@ def __init__(self, uri, ref_name='fairobject', derived_from: List[str] = None,
3033
# A blank node to which triples about the linguistic
3134
# system for this FAIR object can be added
3235
self.lingsys_ref = rdflib.BNode('LinguisticSystem')
33-
self._rdf.add((self.self_ref, DCTERMS.language, self.lingsys_ref))
3436

3537
if language is not None:
3638
self.language = language
@@ -42,6 +44,7 @@ def _bind_namespaces(self):
4244
"""
4345
self.rdf.bind("npx", namespaces.NPX)
4446
self.rdf.bind("pplan", namespaces.PPLAN)
47+
self.rdf.bind("prov", namespaces.PROV)
4548
self.rdf.bind("dul", namespaces.DUL)
4649
self.rdf.bind("bpmn", namespaces.BPMN)
4750
self.rdf.bind("pwo", namespaces.PWO)
@@ -163,16 +166,23 @@ def language(self):
163166
"""Returns the language for this fair objects's description (could be code).
164167
Returns a LinguisticSystem object.
165168
"""
166-
lingsys_rdf = rdflib.Graph()
167-
for t in self._rdf.triples((self.lingsys_ref, None, None)):
168-
lingsys_rdf.add(t)
169-
return LinguisticSystem.from_rdf(lingsys_rdf)
169+
if (None, DCTERMS.language, self.lingsys_ref) in self._rdf:
170+
lingsys_rdf = rdflib.Graph()
171+
for t in self._rdf.triples((self.lingsys_ref, None, None)):
172+
lingsys_rdf.add(t)
173+
return LinguisticSystem.from_rdf(lingsys_rdf)
174+
else:
175+
return None
170176

171177
@language.setter
172178
def language(self, value: LinguisticSystem):
173179
"""Sets the language for this fair object's code (takes a LinguisticSystem).
174180
Removes the existing linguistic system triples from the RDF decription
175181
and replaces them with the new linguistic system."""
182+
183+
if (None, DCTERMS.language, self.lingsys_ref) not in self._rdf:
184+
self._rdf.add((self.self_ref, DCTERMS.language, self.lingsys_ref))
185+
176186
lingsys_triples = list(self._rdf.triples( (self.lingsys_ref, None, None) ))
177187
if len(lingsys_triples) > 0:
178188
self._rdf.remove(lingsys_triples)
@@ -299,6 +309,7 @@ def _publish_as_nanopub(self, use_test_server=False, **kwargs):
299309
introduces_concept=self.self_ref,
300310
derived_from=self._derived_from,
301311
**kwargs)
312+
302313
client = NanopubClient(use_test_server=use_test_server)
303314
publication_info = client.publish(nanopub)
304315

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
nanopub==1.2.3
1+
nanopub==1.2.5
22
networkx~=2.5
33
pytest
44
pyyaml

tests/test_fairstep.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ def test_shacl_does_not_validate(self):
246246

247247
assert len(step.rdf) == n_triples_before, 'shacl_validate mutated RDF'
248248

249-
250-
def test_is_fairstep_decorator():
249+
@patch('fairworkflows.rdf_wrapper.NanopubClient.publish')
250+
def test_is_fairstep_decorator(mock_publish):
251251
@is_fairstep(label='test_label')
252252
def add(a: int, b: int) -> int:
253253
"""
@@ -257,6 +257,9 @@ def add(a: int, b: int) -> int:
257257

258258
assert hasattr(add(1,2), '_fairstep')
259259

260+
add._fairstep.publish_as_nanopub()
261+
assert mock_publish.call_count == 1
262+
260263
def test_decorator_semantic_types():
261264
test_types_a = ['http://www.example.org/distance', 'http://www.example.org/number']
262265
test_type_output = 'http://www.example.org/walrus'

0 commit comments

Comments
 (0)