@@ -108,48 +108,48 @@ def test_workflow_arguments(self):
108
108
109
109
def test_linear_workflow (self ):
110
110
in_task = InputTask ()
111
+ in_task_name = 'My input task'
111
112
inc_task = Inc ()
112
113
pow_task = Pow ()
113
- eow = LinearWorkflow ((in_task , 'task name' ), inc_task , inc_task , pow_task )
114
+ eow = LinearWorkflow ((in_task , in_task_name ), inc_task , inc_task , pow_task )
114
115
res = eow .execute ({
115
116
in_task : {'val' : 2 },
116
117
inc_task : {'d' : 2 }, # Note that this will assign value only to one instance of Inc task
117
118
pow_task : {'n' : 3 }
118
119
})
119
120
self .assertEqual (res [pow_task ], (2 + 2 + 1 ) ** 3 )
120
121
122
+ task_map = eow .get_tasks ()
123
+ self .assertTrue (in_task_name in task_map , "A task with name '{}' should be amongst tasks" .format (in_task_name ))
124
+ self .assertEqual (task_map [in_task_name ], in_task ,
125
+ "A task with name '{}' should map into {}" .format (in_task_name , in_task ))
126
+
121
127
def test_get_tasks (self ):
122
128
in_task = InputTask ()
123
129
inc_task = Inc ()
124
- pow_task = Pow ()
125
130
126
- task_names = ['InputTask' , 'Inc' , 'Pow' ]
127
- workflow_tasks = [in_task , inc_task , pow_task ]
128
- eow = LinearWorkflow (* workflow_tasks )
131
+ task_names = ['InputTask' , 'Inc' , 'Inc_1' , 'Inc_2' ]
132
+ eow = LinearWorkflow (in_task , inc_task , inc_task , inc_task )
129
133
130
134
returned_tasks = eow .get_tasks ()
131
135
132
136
# check if tasks are present
133
- for task_name in task_names :
134
- self .assertIn (task_name , returned_tasks .keys ())
137
+ self .assertEqual (sorted (task_names ), sorted (returned_tasks ))
135
138
136
139
# check if tasks still work
137
140
arguments_dict = {
138
141
in_task : {'val' : 2 },
139
- inc_task : {'d' : 2 },
140
- pow_task : {'n' : 3 }
142
+ inc_task : {'d' : 2 }
141
143
}
142
144
143
145
res_workflow = eow .execute (arguments_dict )
144
- res_workflow_value = [ res_workflow [ key ] for key in res_workflow . keys ()][ 0 ]
146
+ res_workflow_value = list ( res_workflow . values ())
145
147
146
- for idx , task in enumerate (workflow_tasks ):
147
- if idx == 0 :
148
- res_tasks_value = task .execute (** arguments_dict [task ])
149
- else :
150
- res_tasks_value = task .execute (res_tasks_value , ** arguments_dict [task ])
148
+ res_tasks_values = []
149
+ for idx , task in enumerate (returned_tasks .values ()):
150
+ res_tasks_values = [task .execute (* res_tasks_values , ** arguments_dict .get (task , {}))]
151
151
152
- self .assertEqual (res_workflow_value , res_tasks_value )
152
+ self .assertEqual (res_workflow_value , res_tasks_values )
153
153
154
154
def test_trivial_workflow (self ):
155
155
task = DummyTask ()
0 commit comments