diff --git a/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj b/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj index 6102c9892..58ba7d4a9 100644 --- a/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj +++ b/src/WorkflowCore.Testing/WorkflowCore.Testing.csproj @@ -12,6 +12,7 @@ + diff --git a/src/WorkflowCore/Interface/IWorkflowModifier.cs b/src/WorkflowCore/Interface/IWorkflowModifier.cs index 835855e97..5597b4f43 100644 --- a/src/WorkflowCore/Interface/IWorkflowModifier.cs +++ b/src/WorkflowCore/Interface/IWorkflowModifier.cs @@ -183,5 +183,15 @@ IStepBuilder Activity(string activityName, Expression IStepBuilder Activity(Expression> activityName, Expression> parameters = null, Expression> effectiveDate = null, Expression> cancelCondition = null); + + /// + /// Execute a sub-workflow + /// + /// Id of the sub-workflow to start + /// The data to pass to the sub-workflow + /// A condition that when true will cancel this sub-workflow + /// + IStepBuilder SubWorkflow(string subWorkflowId, Expression> parameters = null, + Expression> cancelCondition = null); } } \ No newline at end of file diff --git a/src/WorkflowCore/Models/ExecutionPointer.cs b/src/WorkflowCore/Models/ExecutionPointer.cs index 6c12afda7..01fe516d0 100644 --- a/src/WorkflowCore/Models/ExecutionPointer.cs +++ b/src/WorkflowCore/Models/ExecutionPointer.cs @@ -51,6 +51,10 @@ public IReadOnlyCollection Scope get => _scope; set => _scope = new List(value); } + + public bool IsComplete => Status == PointerStatus.Complete; + + public bool HasChildren => Children?.Count > 0; } public enum PointerStatus diff --git a/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs b/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs new file mode 100644 index 000000000..cbb7cc4e5 --- /dev/null +++ b/src/WorkflowCore/Models/LifeCycleEvents/SubWorkflowLifeCycleEvent.cs @@ -0,0 +1,7 @@ +namespace WorkflowCore.Models.LifeCycleEvents +{ + public class SubWorkflowLifeCycleEvent : LifeCycleEvent + { + + } +} \ No newline at end of file diff --git a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs index 7d6cd4c9e..b2ac30a30 100644 --- a/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs +++ b/src/WorkflowCore/Primitives/SubWorkflowStepBody.cs @@ -1,15 +1,60 @@ using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using WorkflowCore.Interface; using WorkflowCore.Models; +using WorkflowCore.Models.LifeCycleEvents; namespace WorkflowCore.Primitives { public class SubWorkflowStepBody : StepBody { + private readonly IScopeProvider _scopeProvider; + + public SubWorkflowStepBody(IScopeProvider scopeProvider) + { + _scopeProvider = scopeProvider; + } + public override ExecutionResult Run(IStepExecutionContext context) { - // TODO: What is this supposed to do? - throw new NotImplementedException(); + var scope = _scopeProvider.CreateScope(context); + var workflowController = scope.ServiceProvider.GetRequiredService(); + var logger = scope.ServiceProvider.GetRequiredService().CreateLogger( + typeof(SubWorkflowStepBody).Namespace + "." + nameof(SubWorkflowStepBody)); + + if (!context.ExecutionPointer.EventPublished) + { + var result = workflowController.StartWorkflow(SubWorkflowId, context.Workflow.Data, context.Workflow.Id).Result; + + logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})", + SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id); + + logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event SubWorkflowLifeCycleEvent with key='{EventKey}'", + SubWorkflowId, result, result); + + var effectiveDate = DateTime.MinValue; + return ExecutionResult.WaitForEvent(nameof(SubWorkflowLifeCycleEvent), result, effectiveDate); + } + + logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId, + context.ExecutionPointer.EventKey); + + var persistenceProvider = scope.ServiceProvider.GetRequiredService(); + var workflowInstance = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result; + if (workflowInstance.Status == WorkflowStatus.Terminated) + { + throw new NotImplementedException(workflowInstance.Status.ToString()); + } + + Result = workflowInstance.Data; + return ExecutionResult.Next(); } + + public string SubWorkflowId { get; set; } + + public object Parameters { get; set; } + + public object Result { get; set; } } } diff --git a/src/WorkflowCore/Primitives/WaitFor.cs b/src/WorkflowCore/Primitives/WaitFor.cs index 7f84be2cd..35065ff56 100644 --- a/src/WorkflowCore/Primitives/WaitFor.cs +++ b/src/WorkflowCore/Primitives/WaitFor.cs @@ -25,7 +25,8 @@ public override ExecutionResult Run(IStepExecutionContext context) effectiveDate = EffectiveDate; } - return ExecutionResult.WaitForEvent(EventName, EventKey, effectiveDate); + var eventKey = context.Workflow.Reference ?? EventKey; + return ExecutionResult.WaitForEvent(EventName, eventKey, effectiveDate); } EventData = context.ExecutionPointer.EventData; diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs index f38f44a5b..18d710e1f 100644 --- a/src/WorkflowCore/ServiceCollectionExtensions.cs +++ b/src/WorkflowCore/ServiceCollectionExtensions.cs @@ -81,6 +81,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A services.AddTransient(); services.AddTransient(); + services.AddTransient(); return services; } diff --git a/src/WorkflowCore/Services/ExecutionResultProcessor.cs b/src/WorkflowCore/Services/ExecutionResultProcessor.cs index 3c684945f..c44f8e74d 100755 --- a/src/WorkflowCore/Services/ExecutionResultProcessor.cs +++ b/src/WorkflowCore/Services/ExecutionResultProcessor.cs @@ -29,12 +29,16 @@ public ExecutionResultProcessor(IExecutionPointerFactory pointerFactory, IDateTi public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult) { + var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})"; + pointer.PersistenceData = result.PersistenceData; pointer.Outcome = result.OutcomeValue; if (result.SleepFor.HasValue) { pointer.SleepUntil = _datetimeProvider.UtcNow.Add(result.SleepFor.Value); pointer.Status = PointerStatus.Sleeping; + _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) will sleep for {SleepUntil}", + stepInfo, workflow.WorkflowDefinitionId, workflow.Id, result.SleepFor.Value); } if (!string.IsNullOrEmpty(result.EventName)) @@ -54,6 +58,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition SubscribeAsOf = result.EventAsOf, SubscriptionData = result.SubscriptionData }); + + _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) waiting for event {EventName}", + stepInfo, workflow.WorkflowDefinitionId, workflow.Id, pointer.EventName); } if (result.Proceed) @@ -87,6 +94,9 @@ public void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition WorkflowDefinitionId = workflow.WorkflowDefinitionId, Version = workflow.Version }); + + _logger.LogDebug("Step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId}) completed", + stepInfo, workflow.WorkflowDefinitionId, workflow.Id); } else { diff --git a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs index 9e41f15ca..2673cdbd8 100644 --- a/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs @@ -588,5 +588,34 @@ public IStepBuilder Activity(Expression SubWorkflow( + string subWorkflowId, + Expression> parameters = null, + Expression> cancelCondition = null) + { + var newStep = new WorkflowStep(); + newStep.CancelCondition = cancelCondition; + + WorkflowBuilder.AddStep(newStep); + var stepBuilder = new StepBuilder(WorkflowBuilder, newStep); + stepBuilder.Input((step) => step.SubWorkflowId, (data) => subWorkflowId); + + if (parameters != null) + stepBuilder.Input((step) => step.Parameters, parameters); + + // use the result of the sub workflow as an output + // merge it with parent workflow data + stepBuilder.Output((body, data) => + { + foreach (var prop in typeof(TData).GetProperties()) + { + prop.SetValue(data, prop.GetValue(body.Result)); + } + }); + + Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id }); + return stepBuilder; + } } } diff --git a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs index adc02f488..145e4205d 100644 --- a/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs +++ b/src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs @@ -296,10 +296,16 @@ public IStepBuilder Activity(string activityName, Expression Activity(Expression> activityName, Expression> parameters = null, Expression> effectiveDate = null, Expression> cancelCondition = null) { return Start().Activity(activityName, parameters, effectiveDate, cancelCondition); } + + public IStepBuilder SubWorkflow(string subWorkflowId, Expression> parameters = null, Expression> cancelCondition = null) + { + return Start().SubWorkflow(subWorkflowId, parameters, cancelCondition); + } private IStepBuilder Start() { diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index da3e9cd85..205e30945 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -89,7 +89,7 @@ public async Task Execute(WorkflowInstance workflow, Can WorkflowId = workflow.Id, ExecutionPointerId = pointer.Id, ErrorTime = _datetimeProvider.UtcNow, - Message = ex.Message + Message = ex.ToString() }); _executionResultProcessor.HandleStepException(workflow, def, pointer, step, ex); @@ -156,9 +156,11 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe CancellationToken = cancellationToken }; + var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})"; + using (var scope = _scopeProvider.CreateScope(context)) { - _logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id); + _logger.LogDebug("Starting step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId})", stepInfo, workflow.WorkflowDefinitionId, workflow.Id); IStepBody body = step.ConstructBody(scope.ServiceProvider); var stepExecutor = scope.ServiceProvider.GetRequiredService(); @@ -221,6 +223,13 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo if (workflow.Status == WorkflowStatus.Complete) { + await OnComplete(workflow, def); + return; + } + + if (workflow.Status == WorkflowStatus.Terminated) + { + await OnTerminated(workflow, def); return; } @@ -236,7 +245,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo workflow.NextExecution = Math.Min(pointerSleep, workflow.NextExecution ?? pointerSleep); } - foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List()).Count > 0)) + foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && x.HasChildren)) { if (!workflow.ExecutionPointers.FindByScope(pointer.Id).All(x => x.EndTime.HasValue)) continue; @@ -256,6 +265,11 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo return; } + await OnComplete(workflow, def); + } + + private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def) + { workflow.Status = WorkflowStatus.Complete; workflow.CompleteTime = _datetimeProvider.UtcNow; @@ -264,6 +278,8 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo var middlewareRunner = scope.ServiceProvider.GetRequiredService(); await middlewareRunner.RunPostMiddleware(workflow, def); } + + _logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) completed", workflow.WorkflowDefinitionId, workflow.Id); _publisher.PublishNotification(new WorkflowCompleted { @@ -274,5 +290,28 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo Version = workflow.Version }); } + + private async Task OnTerminated(WorkflowInstance workflow, WorkflowDefinition def) + { + workflow.Status = WorkflowStatus.Terminated; + workflow.CompleteTime = _datetimeProvider.UtcNow; + + using (var scope = _serviceProvider.CreateScope()) + { + var middlewareRunner = scope.ServiceProvider.GetRequiredService(); + await middlewareRunner.RunPostMiddleware(workflow, def); + } + + _logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) terminated", workflow.WorkflowDefinitionId, workflow.Id); + + _publisher.PublishNotification(new WorkflowTerminated + { + EventTimeUtc = _datetimeProvider.UtcNow, + Reference = workflow.Reference, + WorkflowInstanceId = workflow.Id, + WorkflowDefinitionId = workflow.WorkflowDefinitionId, + Version = workflow.Version + }); + } } } \ No newline at end of file diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs index 73c8850fa..6fc8caf69 100644 --- a/src/WorkflowCore/Services/WorkflowHost.cs +++ b/src/WorkflowCore/Services/WorkflowHost.cs @@ -165,6 +165,15 @@ public Task TerminateWorkflow(string workflowId) public void HandleLifeCycleEvent(LifeCycleEvent evt) { + switch (evt) + { + // publish the event as sub workflow lifecycle event + case WorkflowCompleted _: + case WorkflowTerminated _: + _workflowController.PublishEvent(nameof(SubWorkflowLifeCycleEvent), evt.WorkflowInstanceId, evt.Reference); + break; + } + OnLifeCycleEvent?.Invoke(evt); } diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index 22ba680f5..b06f275b4 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -100,6 +100,10 @@ public async Task GetWorkflowInstance(string Id, CancellationT { using (var db = ConstructDbContext()) { + if (!Guid.TryParse(Id, out _)) + { + + } var uid = new Guid(Id); var raw = await db.Set() .Include(wf => wf.ExecutionPointers) diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs new file mode 100644 index 000000000..dc87c60bd --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs @@ -0,0 +1,164 @@ +using System; +using FluentAssertions; +using Newtonsoft.Json.Linq; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Testing; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.IntegrationTests.Scenarios; + +public class SubWorkflowScenario : WorkflowTest +{ + public class ApprovalInput + { + public string Id { get; set; } + public bool Approved { get; set; } + public TimeSpan TimeSpan { get; set; } + public string Message { get; set; } + } + + public class ParentWorkflow : IWorkflow + { + public string Id => nameof(ParentWorkflow); + + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate) + .StartWith(context => ExecutionResult.Next()) + .SubWorkflow(nameof(ChildWorkflow)) + .Output(i => i.Approved, step => ((ApprovalInput)step.Result).Approved) + /* + * this does throw an exception + .If(data => data.Approved) + .Do(then => + ExecutionResult.Outcome(1248))*/; + } + } + + public class ChildWorkflow : IWorkflow + { + public string Id => nameof(ChildWorkflow); + + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate) + .StartWith(context => ExecutionResult.Next()) + .Parallel() + .Do(then + => then + .Delay(i => i.TimeSpan) + .Output(i => i.Approved, step => false) + .EndWorkflow() + ) + .Do(then + => then + .WaitFor("Approved", e => e.Id) + .Output((w, input) => + { + var j = JObject.FromObject(w.EventData); + input.Approved = j["Approved"].Value(); + input.Message= j["Message"].Value(); + }) + .EndWorkflow() + ) + .Join(); + } + } + + public SubWorkflowScenario() + { + Setup(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Scenario(bool approved) + { + Host.Registry.RegisterWorkflow(new ChildWorkflow()); + + var eventKey = Guid.NewGuid().ToString(); + var workflowId = StartWorkflow(new ApprovalInput + { + Id = eventKey, + TimeSpan = TimeSpan.FromMinutes(10) + }); + + WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5)); + UnhandledStepErrors.Should().BeEmpty(); + + Host.PublishEvent("Approved", workflowId, new + { + Approved = approved, + Message = "message " + approved + }); + + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10)); + + System.Threading.Thread.Sleep(2000); + + UnhandledStepErrors.Should().BeEmpty(); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + GetData(workflowId).ShouldBeEquivalentTo(new ApprovalInput + { + Id = eventKey, + Approved = approved, + Message = "message " + approved, + TimeSpan = TimeSpan.FromMinutes(10) + }); + } + + [Fact] + public void Failure() + { + Host.Registry.RegisterWorkflow(new ChildWorkflow()); + + var eventKey = Guid.NewGuid().ToString(); + var workflowId = StartWorkflow(new ApprovalInput + { + Id = eventKey, + TimeSpan = TimeSpan.FromMinutes(10) + }); + + WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5)); + UnhandledStepErrors.Should().BeEmpty(); + + Host.PublishEvent("Approved", workflowId, new + { + Approved = "string" + }); + + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(20)); + + System.Threading.Thread.Sleep(2000); + + UnhandledStepErrors.Should().NotBeEmpty(); + GetStatus(workflowId).Should().Be(WorkflowStatus.Terminated); + } + + [Fact] + public void Timeout() + { + Host.Registry.RegisterWorkflow(new ChildWorkflow()); + + var workflowId = StartWorkflow(new ApprovalInput + { + Id = Guid.NewGuid().ToString(), + TimeSpan = TimeSpan.FromSeconds(5) + }); + WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(2)); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(10)); + + UnhandledStepErrors.Should().BeEmpty(); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + GetData(workflowId).Approved.Should().BeFalse(); + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs new file mode 100644 index 000000000..6fc07c5a6 --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs @@ -0,0 +1,16 @@ +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerSubWorkflowScenario() : SubWorkflowScenario() + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs new file mode 100644 index 000000000..d1e2ae06a --- /dev/null +++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs @@ -0,0 +1,21 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; +using Xunit.Abstractions; + +namespace WorkflowCore.Tests.Sqlite.Scenarios +{ + [Collection("Sqlite collection")] + public class SqliteSubWorkflowScenario : SubWorkflowScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(cfg => + { + cfg.UseSqlite($"Data Source=wfc-tests-{DateTime.Now.Ticks}.db;", true); + cfg.UsePollInterval(TimeSpan.FromSeconds(2)); + }); + } + } +} \ No newline at end of file