diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index d879cc7ef..1edb7c281 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -1078,6 +1078,8 @@ def _get_input_dicts(self, with_items_values, prev_index=-1): ctx = {} index = prev_index + concurrency + if self.is_index_invalid(index): + return for k, v in with_items_values.items(): ctx.update({k: v[index]}) @@ -1098,6 +1100,19 @@ def _get_input_dicts(self, with_items_values, prev_index=-1): result.append((i, self._get_action_input(ctx))) return result + + def is_index_invalid(self, index): + with db_api.named_lock('with-items-%s' % self.task_ex.id): + + indices = db_api.get_accepted_executions_indexes( + id=self.task_ex.id, + workflow=self.task_ex.spec.get('workflow'), + accepted=True + ) + + finished = [idx[0] for idx in indices] + + return index in finished def _get_with_items_context(self): return self.task_ex.runtime_context.get(