Skip to content

Commit

Permalink
Changed on error suppress behavior; Fixed nested pipelines error hand…
Browse files Browse the repository at this point in the history
…ling (#60)

* Changed on error suppress behavior

* Fixed nested pipelines error handling

* added more onError tests
  • Loading branch information
mvSapphire authored Jul 31, 2024
1 parent 2b75ff3 commit 413c598
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 12 deletions.
11 changes: 9 additions & 2 deletions src/PowerPipe/Builder/PipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public PipelineBuilder<TContext, TResult> If(
{
var internalBuilder = action(new PipelineBuilder<TContext, TResult>(_pipelineStepFactory, _context));

Steps.Add(new IfPipelineStep<TContext, TResult>(predicate, internalBuilder));
internalBuilder.MarkStepsAsNestedPipeline();

Steps.Add(new IfPipelineStep<TContext, TResult>(predicate, internalBuilder, _loggerFactory));

return this;
}
Expand All @@ -112,7 +114,9 @@ public PipelineBuilder<TContext, TResult> Parallel(
{
var internalBuilder = action(new PipelineBuilder<TContext, TResult>(_pipelineStepFactory, _context));

Steps.Add(new ParallelStep<TContext, TResult>(maxDegreeOfParallelism, internalBuilder));
internalBuilder.MarkStepsAsNestedPipeline();

Steps.Add(new ParallelStep<TContext, TResult>(maxDegreeOfParallelism, internalBuilder, _loggerFactory));

return this;
}
Expand Down Expand Up @@ -156,4 +160,7 @@ public IPipeline<TResult> Build()

return new Pipeline<TContext, TResult>(_context, Steps);
}

private void MarkStepsAsNestedPipeline() =>
Steps.ForEach(step => step.MarkStepAsNested());
}
9 changes: 8 additions & 1 deletion src/PowerPipe/Builder/Steps/IfPipelineStep.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace PowerPipe.Builder.Steps;

Expand All @@ -21,10 +22,14 @@ internal class IfPipelineStep<TContext, TResult> : InternalStep<TContext>
/// </summary>
/// <param name="predicate">The predicate to determine whether to execute the sub-pipeline.</param>
/// <param name="pipelineBuilder">The builder for the sub-pipeline to execute.</param>
public IfPipelineStep(Predicate<TContext> predicate, PipelineBuilder<TContext, TResult> pipelineBuilder)
/// <param name="loggerFactory">A logger factory</param>
public IfPipelineStep(
Predicate<TContext> predicate, PipelineBuilder<TContext, TResult> pipelineBuilder, ILoggerFactory loggerFactory)
{
_predicate = predicate;
_pipelineBuilder = pipelineBuilder;

Logger = loggerFactory?.CreateLogger<IfPipelineStep<TContext, TResult>>();
}

/// <summary>
Expand All @@ -37,6 +42,8 @@ protected override async ValueTask ExecuteInternalAsync(TContext context, Cancel
{
if (_predicate(context))
{
Logger?.LogDebug("Executing internal pipeline.");

StepExecuted = true;

await _pipelineBuilder.Build().RunAsync(cancellationToken, returnResult: false);
Expand Down
39 changes: 34 additions & 5 deletions src/PowerPipe/Builder/Steps/InternalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ internal abstract class InternalStep<TContext> : IPipelineStep<TContext>, IPipel
/// </summary>
protected virtual bool StepExecuted { get; set; }

/// <summary>
/// Gets or sets a value indication where this step is in a nested pipeline.
/// </summary>
protected virtual bool IsNested { get; private set; }

/// <summary>
/// Gets or sets the error handling behavior for this step.
/// </summary>
Expand All @@ -57,6 +62,14 @@ internal abstract class InternalStep<TContext> : IPipelineStep<TContext>, IPipel

private bool AllowedToCompensate => StepExecuted && ErrorHandledSucceed == false && CompensationStep?.IsCompensated == false;

/// <summary>
/// Marks step as nested.
/// </summary>
public void MarkStepAsNested()
{
IsNested = true;
}

/// <summary>
/// Configures error handling behavior for this step.
/// </summary>
Expand Down Expand Up @@ -87,20 +100,30 @@ public async ValueTask ExecuteAsync(TContext context, CancellationToken cancella

await ExecuteInternalAsync(context, cancellationToken);
}
catch (Exception e)
catch (Exception exception)
{
if (e is PipelineExecutionException or OperationCanceledException)
if (!HandleException(exception))
{
ErrorHandledSucceed = false;
throw;
}

Logger?.LogDebug("Exception thrown during execution. {exception}", e);
Logger?.LogError("Exception thrown during execution. {exception}", exception);

ErrorHandledSucceed = await HandleExceptionAsync(context, cancellationToken);

if (!ErrorHandledSucceed.Value)
throw new PipelineExecutionException(e);
if (ErrorHandledSucceed.Value)
{
if (NextStep is not null)
await NextStep.ExecuteAsync(context, cancellationToken);
}
else
{
if (IsNested)
throw new NestedPipelineExecutionException(exception);
else
throw new PipelineExecutionException(exception);
}
}
finally
{
Expand All @@ -110,6 +133,12 @@ public async ValueTask ExecuteAsync(TContext context, CancellationToken cancella
await CompensationStep.CompensateAsync(context, cancellationToken);
}
}

return;

bool HandleException(Exception exception) =>
(!IsNested || exception is not NestedPipelineExecutionException) &&
exception is not (PipelineExecutionException or OperationCanceledException);
}

/// <summary>
Expand Down
8 changes: 5 additions & 3 deletions src/PowerPipe/Builder/Steps/LazyStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ internal LazyStep(Func<IStepBase<TContext>> factory, ILoggerFactory loggerFactor
if (instance is IPipelineStep<TContext> step)
step.NextStep = NextStep;

Logger = loggerFactory?.CreateLogger(instance.GetType());
var instanceType = instance.GetType();

Logger = loggerFactory?.CreateLogger(instanceType);

Logger?.LogDebug("{Step} executing", instanceType.FullName);

return instance;
});
Expand All @@ -45,7 +49,5 @@ protected override async ValueTask ExecuteInternalAsync(TContext context, Cancel
StepExecuted = true;

await _step.Value.ExecuteAsync(context, cancellationToken);

Logger?.LogDebug("{Step} executed", _step.Value.GetType().FullName);
}
}
9 changes: 8 additions & 1 deletion src/PowerPipe/Builder/Steps/ParallelStep.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace PowerPipe.Builder.Steps;

Expand All @@ -20,10 +21,14 @@ internal class ParallelStep<TContext, TResult> : InternalStep<TContext>
/// </summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism for the parallel execution.</param>
/// <param name="pipelineBuilder">The builder for the sub-pipeline to execute in parallel.</param>
public ParallelStep(int maxDegreeOfParallelism, PipelineBuilder<TContext, TResult> pipelineBuilder)
/// <param name="loggerFactory">A logger factory</param>
public ParallelStep(
int maxDegreeOfParallelism, PipelineBuilder<TContext, TResult> pipelineBuilder, ILoggerFactory loggerFactory)
{
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_pipelineBuilder = pipelineBuilder;

Logger = loggerFactory?.CreateLogger<ParallelStep<TContext, TResult>>();
}

/// <summary>
Expand All @@ -34,6 +39,8 @@ public ParallelStep(int maxDegreeOfParallelism, PipelineBuilder<TContext, TResul
/// <returns>A task representing the asynchronous operation.</returns>
protected override async ValueTask ExecuteInternalAsync(TContext context, CancellationToken cancellationToken)
{
Logger?.LogDebug("Executing parallel steps.");

StepExecuted = true;

var parallelOptions = new ParallelOptions
Expand Down
18 changes: 18 additions & 0 deletions src/PowerPipe/Exceptions/NestedPipelineExecutionException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace PowerPipe.Exceptions;

/// <summary>
/// Represents an exception that occurs during nested pipeline execution.
/// </summary>
public class NestedPipelineExecutionException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="NestedPipelineExecutionException"/> class with a specified exception.
/// </summary>
/// <param name="exception">The inner exception that caused the nested pipeline execution to fail.</param>
public NestedPipelineExecutionException(Exception exception)
: base("Nested pipeline execution failed", exception)
{
}
}
56 changes: 56 additions & 0 deletions tests/PowerPipe.UnitTests/PipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,20 @@ public async Task IfStep_Succeed(bool predicate)
public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehaviour, bool applyErrorHandling)
{
var step = Substitute.For<TestStep1>();
var step2 = Substitute.For<TestStep2>();

step.ExecuteAsync(Arg.Any<TestPipelineContext>(), Arg.Any<CancellationToken>())
.Returns(_ => throw new InvalidOperationException("Test message"));
step2.ExecuteAsync(Arg.Any<TestPipelineContext>(), Arg.Any<CancellationToken>())
.Returns(ValueTask.CompletedTask);

var context = new TestPipelineContext();
var cts = new CancellationTokenSource();

var stepFactory =
new PipelineStepFactory(new ServiceCollection()
.AddTransient(_ => step)
.AddTransient(_ => step2)
.BuildServiceProvider());

var retryCount = 3;
Expand All @@ -153,6 +157,7 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio
var pipeline = new PipelineBuilder<TestPipelineContext, TestPipelineResult>(stepFactory, context)
.Add<TestStep1>()
.OnError(errorHandlingBehaviour, maxRetryCount: isRetryBehaviour ? retryCount : default, predicate: ShouldApplyErrorHandling)
.Add<TestStep2>()
.Build();

var action = async () => await pipeline.RunAsync(cts.Token);
Expand All @@ -170,6 +175,7 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio
await action.Should().NotThrowAsync<PipelineExecutionException>();

await step.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token));
await step2.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token));
}
}
else
Expand All @@ -178,6 +184,56 @@ public async Task OnError_Succeed(PipelineStepErrorHandling errorHandlingBehavio
}
}

[Theory]
[InlineData(PipelineStepErrorHandling.Suppress)]
[InlineData(PipelineStepErrorHandling.Retry)]
public async Task OnError_NestedPipeline_Succeed(PipelineStepErrorHandling errorHandlingBehaviour)
{
var step = Substitute.For<TestStep1>();
var step2 = Substitute.For<TestStep2>();

step.ExecuteAsync(Arg.Any<TestPipelineContext>(), Arg.Any<CancellationToken>())
.Returns(_ => throw new InvalidOperationException("Test message"));
step2.ExecuteAsync(Arg.Any<TestPipelineContext>(), Arg.Any<CancellationToken>())
.Returns(ValueTask.CompletedTask);

var context = new TestPipelineContext();
var cts = new CancellationTokenSource();

var stepFactory =
new PipelineStepFactory(new ServiceCollection()
.AddTransient(_ => step)
.AddTransient(_ => step2)
.BuildServiceProvider());

var isRetryBehaviour = errorHandlingBehaviour is PipelineStepErrorHandling.Retry;
var maxRetryCount = isRetryBehaviour ? 3 : default;

var pipeline = new PipelineBuilder<TestPipelineContext, TestPipelineResult>(stepFactory, context)
.If(_ => true, b => b
.Add<TestStep1>())
.OnError(errorHandlingBehaviour, maxRetryCount: maxRetryCount)
.Add<TestStep2>()
.Build();

var action = async () => await pipeline.RunAsync(cts.Token);

if (isRetryBehaviour)
{
await action.Should().ThrowAsync<PipelineExecutionException>();

await step.Received(1 + maxRetryCount).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token));
await step2.Received(0).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token));
}
else
{
await action.Should().NotThrowAsync<PipelineExecutionException>();

await step.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token));
await step2.Received(1).ExecuteAsync(Arg.Is(context), Arg.Is(cts.Token));
}
}

[Fact]
public async Task CompensateWith_Succeed()
{
Expand Down

0 comments on commit 413c598

Please sign in to comment.