From 413c59896a9f0fc98d4add32c41bfab8d5ada726 Mon Sep 17 00:00:00 2001 From: Max Vorchakov Date: Wed, 31 Jul 2024 08:20:13 +0200 Subject: [PATCH] Changed on error suppress behavior; Fixed nested pipelines error handling (#60) * Changed on error suppress behavior * Fixed nested pipelines error handling * added more onError tests --- src/PowerPipe/Builder/PipelineBuilder.cs | 11 +++- src/PowerPipe/Builder/Steps/IfPipelineStep.cs | 9 ++- src/PowerPipe/Builder/Steps/InternalStep.cs | 39 +++++++++++-- src/PowerPipe/Builder/Steps/LazyStep.cs | 8 ++- src/PowerPipe/Builder/Steps/ParallelStep.cs | 9 ++- .../NestedPipelineExecutionException.cs | 18 ++++++ tests/PowerPipe.UnitTests/PipelineTests.cs | 56 +++++++++++++++++++ 7 files changed, 138 insertions(+), 12 deletions(-) create mode 100644 src/PowerPipe/Exceptions/NestedPipelineExecutionException.cs diff --git a/src/PowerPipe/Builder/PipelineBuilder.cs b/src/PowerPipe/Builder/PipelineBuilder.cs index 7314319..924b446 100644 --- a/src/PowerPipe/Builder/PipelineBuilder.cs +++ b/src/PowerPipe/Builder/PipelineBuilder.cs @@ -96,7 +96,9 @@ public PipelineBuilder If( { var internalBuilder = action(new PipelineBuilder(_pipelineStepFactory, _context)); - Steps.Add(new IfPipelineStep(predicate, internalBuilder)); + internalBuilder.MarkStepsAsNestedPipeline(); + + Steps.Add(new IfPipelineStep(predicate, internalBuilder, _loggerFactory)); return this; } @@ -112,7 +114,9 @@ public PipelineBuilder Parallel( { var internalBuilder = action(new PipelineBuilder(_pipelineStepFactory, _context)); - Steps.Add(new ParallelStep(maxDegreeOfParallelism, internalBuilder)); + internalBuilder.MarkStepsAsNestedPipeline(); + + Steps.Add(new ParallelStep(maxDegreeOfParallelism, internalBuilder, _loggerFactory)); return this; } @@ -156,4 +160,7 @@ public IPipeline Build() return new Pipeline(_context, Steps); } + + private void MarkStepsAsNestedPipeline() => + Steps.ForEach(step => step.MarkStepAsNested()); } diff --git a/src/PowerPipe/Builder/Steps/IfPipelineStep.cs b/src/PowerPipe/Builder/Steps/IfPipelineStep.cs index ca8a6a6..50df043 100644 --- a/src/PowerPipe/Builder/Steps/IfPipelineStep.cs +++ b/src/PowerPipe/Builder/Steps/IfPipelineStep.cs @@ -1,6 +1,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace PowerPipe.Builder.Steps; @@ -21,10 +22,14 @@ internal class IfPipelineStep : InternalStep /// /// The predicate to determine whether to execute the sub-pipeline. /// The builder for the sub-pipeline to execute. - public IfPipelineStep(Predicate predicate, PipelineBuilder pipelineBuilder) + /// A logger factory + public IfPipelineStep( + Predicate predicate, PipelineBuilder pipelineBuilder, ILoggerFactory loggerFactory) { _predicate = predicate; _pipelineBuilder = pipelineBuilder; + + Logger = loggerFactory?.CreateLogger>(); } /// @@ -37,6 +42,8 @@ protected override async ValueTask ExecuteInternalAsync(TContext context, Cancel { if (_predicate(context)) { + Logger?.LogDebug("Executing internal pipeline."); + StepExecuted = true; await _pipelineBuilder.Build().RunAsync(cancellationToken, returnResult: false); diff --git a/src/PowerPipe/Builder/Steps/InternalStep.cs b/src/PowerPipe/Builder/Steps/InternalStep.cs index 2f0d1cb..97a6c4a 100644 --- a/src/PowerPipe/Builder/Steps/InternalStep.cs +++ b/src/PowerPipe/Builder/Steps/InternalStep.cs @@ -31,6 +31,11 @@ internal abstract class InternalStep : IPipelineStep, IPipel /// protected virtual bool StepExecuted { get; set; } + /// + /// Gets or sets a value indication where this step is in a nested pipeline. + /// + protected virtual bool IsNested { get; private set; } + /// /// Gets or sets the error handling behavior for this step. /// @@ -57,6 +62,14 @@ internal abstract class InternalStep : IPipelineStep, IPipel private bool AllowedToCompensate => StepExecuted && ErrorHandledSucceed == false && CompensationStep?.IsCompensated == false; + /// + /// Marks step as nested. + /// + public void MarkStepAsNested() + { + IsNested = true; + } + /// /// Configures error handling behavior for this step. /// @@ -87,20 +100,30 @@ public async ValueTask ExecuteAsync(TContext context, CancellationToken cancella await ExecuteInternalAsync(context, cancellationToken); } - catch (Exception e) + catch (Exception exception) { - if (e is PipelineExecutionException or OperationCanceledException) + if (!HandleException(exception)) { ErrorHandledSucceed = false; throw; } - Logger?.LogDebug("Exception thrown during execution. {exception}", e); + Logger?.LogError("Exception thrown during execution. {exception}", exception); ErrorHandledSucceed = await HandleExceptionAsync(context, cancellationToken); - if (!ErrorHandledSucceed.Value) - throw new PipelineExecutionException(e); + if (ErrorHandledSucceed.Value) + { + if (NextStep is not null) + await NextStep.ExecuteAsync(context, cancellationToken); + } + else + { + if (IsNested) + throw new NestedPipelineExecutionException(exception); + else + throw new PipelineExecutionException(exception); + } } finally { @@ -110,6 +133,12 @@ public async ValueTask ExecuteAsync(TContext context, CancellationToken cancella await CompensationStep.CompensateAsync(context, cancellationToken); } } + + return; + + bool HandleException(Exception exception) => + (!IsNested || exception is not NestedPipelineExecutionException) && + exception is not (PipelineExecutionException or OperationCanceledException); } /// diff --git a/src/PowerPipe/Builder/Steps/LazyStep.cs b/src/PowerPipe/Builder/Steps/LazyStep.cs index ffae4dc..0a97e17 100644 --- a/src/PowerPipe/Builder/Steps/LazyStep.cs +++ b/src/PowerPipe/Builder/Steps/LazyStep.cs @@ -28,7 +28,11 @@ internal LazyStep(Func> factory, ILoggerFactory loggerFactor if (instance is IPipelineStep step) step.NextStep = NextStep; - Logger = loggerFactory?.CreateLogger(instance.GetType()); + var instanceType = instance.GetType(); + + Logger = loggerFactory?.CreateLogger(instanceType); + + Logger?.LogDebug("{Step} executing", instanceType.FullName); return instance; }); @@ -45,7 +49,5 @@ protected override async ValueTask ExecuteInternalAsync(TContext context, Cancel StepExecuted = true; await _step.Value.ExecuteAsync(context, cancellationToken); - - Logger?.LogDebug("{Step} executed", _step.Value.GetType().FullName); } } diff --git a/src/PowerPipe/Builder/Steps/ParallelStep.cs b/src/PowerPipe/Builder/Steps/ParallelStep.cs index 3006b2d..07065df 100644 --- a/src/PowerPipe/Builder/Steps/ParallelStep.cs +++ b/src/PowerPipe/Builder/Steps/ParallelStep.cs @@ -1,5 +1,6 @@ using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace PowerPipe.Builder.Steps; @@ -20,10 +21,14 @@ internal class ParallelStep : InternalStep /// /// The maximum degree of parallelism for the parallel execution. /// The builder for the sub-pipeline to execute in parallel. - public ParallelStep(int maxDegreeOfParallelism, PipelineBuilder pipelineBuilder) + /// A logger factory + public ParallelStep( + int maxDegreeOfParallelism, PipelineBuilder pipelineBuilder, ILoggerFactory loggerFactory) { _maxDegreeOfParallelism = maxDegreeOfParallelism; _pipelineBuilder = pipelineBuilder; + + Logger = loggerFactory?.CreateLogger>(); } /// @@ -34,6 +39,8 @@ public ParallelStep(int maxDegreeOfParallelism, PipelineBuilderA task representing the asynchronous operation. protected override async ValueTask ExecuteInternalAsync(TContext context, CancellationToken cancellationToken) { + Logger?.LogDebug("Executing parallel steps."); + StepExecuted = true; var parallelOptions = new ParallelOptions diff --git a/src/PowerPipe/Exceptions/NestedPipelineExecutionException.cs b/src/PowerPipe/Exceptions/NestedPipelineExecutionException.cs new file mode 100644 index 0000000..6831c3c --- /dev/null +++ b/src/PowerPipe/Exceptions/NestedPipelineExecutionException.cs @@ -0,0 +1,18 @@ +using System; + +namespace PowerPipe.Exceptions; + +/// +/// Represents an exception that occurs during nested pipeline execution. +/// +public class NestedPipelineExecutionException : Exception +{ + /// + /// Initializes a new instance of the class with a specified exception. + /// + /// The inner exception that caused the nested pipeline execution to fail. + public NestedPipelineExecutionException(Exception exception) + : base("Nested pipeline execution failed", exception) + { + } +} diff --git a/tests/PowerPipe.UnitTests/PipelineTests.cs b/tests/PowerPipe.UnitTests/PipelineTests.cs index 9157bf3..ea7a050 100644 --- a/tests/PowerPipe.UnitTests/PipelineTests.cs +++ b/tests/PowerPipe.UnitTests/PipelineTests.cs @@ -133,9 +133,12 @@ public async Task IfStep_Succeed(bool predicate) public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehaviour, bool applyErrorHandling) { var step = Substitute.For(); + var step2 = Substitute.For(); step.ExecuteAsync(Arg.Any(), Arg.Any()) .Returns(_ => throw new InvalidOperationException("Test message")); + step2.ExecuteAsync(Arg.Any(), Arg.Any()) + .Returns(ValueTask.CompletedTask); var context = new TestPipelineContext(); var cts = new CancellationTokenSource(); @@ -143,6 +146,7 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio var stepFactory = new PipelineStepFactory(new ServiceCollection() .AddTransient(_ => step) + .AddTransient(_ => step2) .BuildServiceProvider()); var retryCount = 3; @@ -153,6 +157,7 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio var pipeline = new PipelineBuilder(stepFactory, context) .Add() .OnError(errorHandlingBehaviour, maxRetryCount: isRetryBehaviour ? retryCount : default, predicate: ShouldApplyErrorHandling) + .Add() .Build(); var action = async () => await pipeline.RunAsync(cts.Token); @@ -170,6 +175,7 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio await action.Should().NotThrowAsync(); await step.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token)); + await step2.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token)); } } else @@ -178,6 +184,56 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio } } + [Theory] + [InlineData(PipelineStepErrorHandling.Suppress)] + [InlineData(PipelineStepErrorHandling.Retry)] + public async Task OnError_NestedPipeline_Succeed(PipelineStepErrorHandling errorHandlingBehaviour) + { + var step = Substitute.For(); + var step2 = Substitute.For(); + + step.ExecuteAsync(Arg.Any(), Arg.Any()) + .Returns(_ => throw new InvalidOperationException("Test message")); + step2.ExecuteAsync(Arg.Any(), Arg.Any()) + .Returns(ValueTask.CompletedTask); + + var context = new TestPipelineContext(); + var cts = new CancellationTokenSource(); + + var stepFactory = + new PipelineStepFactory(new ServiceCollection() + .AddTransient(_ => step) + .AddTransient(_ => step2) + .BuildServiceProvider()); + + var isRetryBehaviour = errorHandlingBehaviour is PipelineStepErrorHandling.Retry; + var maxRetryCount = isRetryBehaviour ? 3 : default; + + var pipeline = new PipelineBuilder(stepFactory, context) + .If(_ => true, b => b + .Add()) + .OnError(errorHandlingBehaviour, maxRetryCount: maxRetryCount) + .Add() + .Build(); + + var action = async () => await pipeline.RunAsync(cts.Token); + + if (isRetryBehaviour) + { + await action.Should().ThrowAsync(); + + await step.Received(1 + maxRetryCount).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token)); + await step2.Received(0).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token)); + } + else + { + await action.Should().NotThrowAsync(); + + await step.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token)); + await step2.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token)); + } + } + [Fact] public async Task CompensateWith_Succeed() {