Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move scopes inside #3454

Merged
merged 8 commits into from
Jan 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# 23. Scoping dependencies inline with lifetime scope

Date: 2025-01-03

## Status

Proposed

## Context

As it stands dependency Lifetime Scopes are wrapped around the `Command Processor` meaning that all options performed by an instance of the Command processor will share the same scope, this becomes problematic when using the `Publish` method as this allows for multiple `Request Handlers` to be subscribed to a single Event, this will mean that all handlers share dependencies in the same scope which is unexpected behavior.

## Decision

When the Handler Factories are configured to not be a singleton Scopes will be created for each Lifetime, and a new lifetime will be given for each registered subscriber.

## Consequences

We will no longer require a `Command processor provider` as this was only created for scoping, and Handler factories will require the lifetime scope to be passed in to all methods so it can use this for managing scopes.
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;

namespace Paramore.Brighter.Extensions.DependencyInjection
@@ -33,7 +34,8 @@ namespace Paramore.Brighter.Extensions.DependencyInjection
public class ServiceProviderHandlerFactory : IAmAHandlerFactorySync, IAmAHandlerFactoryAsync
{
private readonly IServiceProvider _serviceProvider;
private readonly bool _isTransient;
private readonly bool _isSingleton;
private readonly Dictionary<IAmALifetime, IServiceScope> _scopes = new Dictionary<IAmALifetime, IServiceScope>();

/// <summary>
/// Constructs a factory that uses the .NET IoC container as the factory
@@ -43,49 +45,76 @@ public ServiceProviderHandlerFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
var options = (IBrighterOptions) serviceProvider.GetService(typeof(IBrighterOptions));
if (options == null) _isTransient = true; else _isTransient = options.HandlerLifetime == ServiceLifetime.Transient;
if (options == null) _isSingleton = false; else _isSingleton = options.HandlerLifetime == ServiceLifetime.Singleton;
}

/// <summary>
/// Creates an instance of the request handler
/// Lifetime is set during registration
/// </summary>
/// <param name="handlerType">The type of handler to request</param>
/// <param name="lifetime">The brighter Handler lifetime</param>
/// <returns>An instantiated request handler</returns>
IHandleRequests IAmAHandlerFactorySync.Create(Type handlerType)
IHandleRequests IAmAHandlerFactorySync.Create(Type handlerType, IAmALifetime lifetime)
{
return (IHandleRequests)_serviceProvider.GetService(handlerType);
if (_isSingleton)
return (IHandleRequests)_serviceProvider.GetService(handlerType);

if (!_scopes.ContainsKey(lifetime))
_scopes.Add(lifetime, _serviceProvider.CreateScope());

return (IHandleRequests)_scopes[lifetime].ServiceProvider.GetService(handlerType);
}

/// <summary>
/// Creates an instance of the request handler
/// Lifetime is set during registration
/// </summary>
/// <param name="handlerType">The type of handler to request</param>
/// <param name="lifetime">The brighter Handler lifetime</param>
/// <returns>An instantiated request handler</returns>
IHandleRequestsAsync IAmAHandlerFactoryAsync.Create(Type handlerType)
IHandleRequestsAsync IAmAHandlerFactoryAsync.Create(Type handlerType, IAmALifetime lifetime)
{
return (IHandleRequestsAsync)_serviceProvider.GetService(handlerType);
if (_isSingleton)
return (IHandleRequestsAsync)_serviceProvider.GetService(handlerType);

if (!_scopes.ContainsKey(lifetime))
_scopes.Add(lifetime, _serviceProvider.CreateScope());

return (IHandleRequestsAsync)_scopes[lifetime].ServiceProvider.GetService(handlerType);
}

/// <summary>
/// Release the request handler - actual behavior depends on lifetime, we only dispose if we are transient
/// </summary>
/// <param name="handler"></param>
public void Release(IHandleRequests handler)
/// <param name="lifetime">The brighter Handler lifetime</param>
public void Release(IHandleRequests handler, IAmALifetime lifetime)
{
if (!_isTransient) return;
if (_isSingleton) return;

var disposal = handler as IDisposable;
disposal?.Dispose();

ReleaseScope(lifetime);
}

public void Release(IHandleRequestsAsync handler)
public void Release(IHandleRequestsAsync handler, IAmALifetime lifetime)
{
if (!_isTransient) return;
if (_isSingleton) return;

var disposal = handler as IDisposable;
disposal?.Dispose();
ReleaseScope(lifetime);
}

private void ReleaseScope(IAmALifetime lifetime)
{
if(!_scopes.TryGetValue(lifetime, out IServiceScope scope))
return;

scope.Dispose();
_scopes.Remove(lifetime);
}
}
}
Original file line number Diff line number Diff line change
@@ -37,12 +37,9 @@ private void SendHeartbeat(object? state)
{
_logger.LogInformation("Sending Heartbeat");

var factory = ((Dispatcher)_dispatcher).CommandProcessorFactory.Invoke();
var commandProcessor = ((Dispatcher)_dispatcher).CommandProcessor;

factory.CreateScope();

HeartBeatSender.Send(factory.Get(), _dispatcher);
HeartBeatSender.Send(commandProcessor, _dispatcher);

factory.ReleaseScope();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -54,23 +54,13 @@ private static Dispatcher BuildDispatcher(IServiceProvider serviceProvider)

var options = serviceProvider.GetService<IServiceActivatorOptions>();

Func<IAmACommandProcessorProvider> providerFactory;

if (options.UseScoped)
{
providerFactory = () => new ScopedCommandProcessorProvider(serviceProvider);
}
else
{
var commandProcessor = serviceProvider.GetService<IAmACommandProcessor>();
providerFactory = () => new CommandProcessorProvider(commandProcessor);
}
var commandProcessor = serviceProvider.GetService<IAmACommandProcessor>();

var requestContextFactory = serviceProvider.GetService<IAmARequestContextFactory>();

var dispatcherBuilder = DispatchBuilder
.StartNew()
.CommandProcessorFactory(providerFactory, requestContextFactory);
.CommandProcessor(commandProcessor, requestContextFactory);

var messageMapperRegistry = ServiceCollectionExtensions.MessageMapperRegistry(serviceProvider);
var messageTransformFactory = ServiceCollectionExtensions.TransformFactory(serviceProvider);

This file was deleted.

14 changes: 7 additions & 7 deletions src/Paramore.Brighter.ServiceActivator/ConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ namespace Paramore.Brighter.ServiceActivator
{
internal class ConsumerFactory<TRequest> : IConsumerFactory where TRequest : class, IRequest
{
private readonly IAmACommandProcessorProvider _commandProcessorProvider;
private readonly IAmACommandProcessor _commandProcessor;
private readonly IAmAMessageMapperRegistry? _messageMapperRegistry;
private readonly Subscription _subscription;
private readonly IAmAMessageTransformerFactory? _messageTransformerFactory;
@@ -41,15 +41,15 @@ internal class ConsumerFactory<TRequest> : IConsumerFactory where TRequest : cla
private readonly IAmAMessageTransformerFactoryAsync? _messageTransformerFactoryAsync;

public ConsumerFactory(
IAmACommandProcessorProvider commandProcessorProvider,
IAmACommandProcessor commandProcessor,
Subscription subscription,
IAmAMessageMapperRegistry messageMapperRegistry,
IAmAMessageTransformerFactory? messageTransformerFactory,
IAmARequestContextFactory requestContextFactory,
IAmABrighterTracer tracer,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
{
_commandProcessorProvider = commandProcessorProvider;
_commandProcessor = commandProcessor;
_messageMapperRegistry = messageMapperRegistry;
_subscription = subscription;
_messageTransformerFactory = messageTransformerFactory ?? new EmptyMessageTransformerFactory();
@@ -60,15 +60,15 @@ public ConsumerFactory(
}

public ConsumerFactory(
IAmACommandProcessorProvider commandProcessorProvider,
IAmACommandProcessor commandProcessor,
Subscription subscription,
IAmAMessageMapperRegistryAsync messageMapperRegistryAsync,
IAmAMessageTransformerFactoryAsync? messageTransformerFactoryAsync,
IAmARequestContextFactory requestContextFactory,
IAmABrighterTracer tracer,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
{
_commandProcessorProvider = commandProcessorProvider;
_commandProcessor = commandProcessor;
_messageMapperRegistryAsync = messageMapperRegistryAsync;
_subscription = subscription;
_messageTransformerFactoryAsync = messageTransformerFactoryAsync ?? new EmptyMessageTransformerFactoryAsync();
@@ -95,7 +95,7 @@ private Consumer CreateReactor()
throw new ArgumentException("Subscription must have a Channel Factory in order to create a consumer.");

var channel = _subscription.ChannelFactory.CreateSyncChannel(_subscription);
var messagePump = new Reactor<TRequest>(_commandProcessorProvider, _messageMapperRegistry,
var messagePump = new Reactor<TRequest>(_commandProcessor, _messageMapperRegistry,
_messageTransformerFactory, _requestContextFactory, channel, _tracer, _instrumentationOptions)
{
Channel = channel,
@@ -117,7 +117,7 @@ private Consumer CreateProactor()
throw new ArgumentException("Subscription must have a Channel Factory in order to create a consumer.");

var channel = _subscription.ChannelFactory.CreateAsyncChannel(_subscription);
var messagePump = new Proactor<TRequest>(_commandProcessorProvider, _messageMapperRegistryAsync,
var messagePump = new Proactor<TRequest>(_commandProcessor, _messageMapperRegistryAsync,
_messageTransformerFactoryAsync, _requestContextFactory, channel, _tracer, _instrumentationOptions)
{
Channel = channel,
Original file line number Diff line number Diff line change
@@ -197,8 +197,7 @@ public Dispatcher Build(string hostName)
if (_channelFactory is null) throw new ArgumentException("Channel Factory must not be null");

return DispatchBuilder.StartNew()
.CommandProcessorFactory(() =>
new CommandProcessorProvider(commandProcessor), new InMemoryRequestContextFactory()
.CommandProcessor(commandProcessor, new InMemoryRequestContextFactory()
)
.MessageMappers(incomingMessageMapperRegistry, null, null, null)
.ChannelFactory(_channelFactory)
26 changes: 13 additions & 13 deletions src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs
Original file line number Diff line number Diff line change
@@ -35,9 +35,9 @@ namespace Paramore.Brighter.ServiceActivator
/// progressive interfaces to manage the requirements for a complete Dispatcher via Intellisense in the IDE. The intent is to make it easier to
/// recognize those dependencies that you need to configure
/// </summary>
public class DispatchBuilder : INeedACommandProcessorFactory, INeedAChannelFactory, INeedAMessageMapper, INeedAListOfSubcriptions, INeedObservability, IAmADispatchBuilder
public class DispatchBuilder : INeedACommandProcessor, INeedAChannelFactory, INeedAMessageMapper, INeedAListOfSubcriptions, INeedObservability, IAmADispatchBuilder
{
private Func<IAmACommandProcessorProvider>? _commandProcessorFactory;
private IAmACommandProcessor? _commandProcessor;
private IAmAMessageMapperRegistry? _messageMapperRegistry;
private IAmAMessageMapperRegistryAsync? _messageMapperRegistryAsync;
private IAmAChannelFactory? _defaultChannelFactory;
@@ -54,23 +54,23 @@ private DispatchBuilder() { }
/// Begins the fluent interface
/// </summary>
/// <returns>INeedALogger.</returns>
public static INeedACommandProcessorFactory StartNew()
public static INeedACommandProcessor StartNew()
{
return new DispatchBuilder();
}

/// <summary>
/// The command processor used to send and publish messages to handlers by the service activator.
/// </summary>
/// <param name="commandProcessorFactory">The command processor Factory.</param>
/// <param name="commandProcessor">The command processor.</param>
/// <param name="requestContextFactory">The factory used to create a request synchronizationHelper for a pipeline</param>
/// <returns>INeedAMessageMapper.</returns>
public INeedAMessageMapper CommandProcessorFactory(
Func<IAmACommandProcessorProvider> commandProcessorFactory,
public INeedAMessageMapper CommandProcessor(
IAmACommandProcessor commandProcessor,
IAmARequestContextFactory requestContextFactory
)
{
_commandProcessorFactory = commandProcessorFactory;
_commandProcessor = commandProcessor;
_requestContextFactory = requestContextFactory;
return this;
}
@@ -157,10 +157,10 @@ public INeedObservability Subscriptions(IEnumerable<Subscription> subscriptions)
/// <returns>Dispatcher.</returns>
public Dispatcher Build()
{
if (_commandProcessorFactory is null || _subscriptions is null)
if (_commandProcessor is null || _subscriptions is null)
throw new ArgumentException("Command Processor Factory and Subscription are required.");

return new Dispatcher(_commandProcessorFactory, _subscriptions, _messageMapperRegistry,
return new Dispatcher(_commandProcessor, _subscriptions, _messageMapperRegistry,
_messageMapperRegistryAsync, _messageTransformerFactory, _messageTransformerFactoryAsync,
_requestContextFactory, _tracer, _instrumentationOptions
);
@@ -174,16 +174,16 @@ public Dispatcher Build()
/// <summary>
/// Interface INeedACommandProcessor
/// </summary>
public interface INeedACommandProcessorFactory
public interface INeedACommandProcessor
{
/// <summary>
/// The command processor used to send and publish messages to handlers by the service activator.
/// </summary>
/// <param name="commandProcessorFactory">The command processor provider Factory.</param>
/// <param name="commandProcessor">The command processor.</param>
/// <param name="requestContextFactory">The factory used to create a request synchronizationHelper for a pipeline</param>
/// <returns>INeedAMessageMapper.</returns>
INeedAMessageMapper CommandProcessorFactory(
Func<IAmACommandProcessorProvider> commandProcessorFactory,
INeedAMessageMapper CommandProcessor(
IAmACommandProcessor commandProcessor,
IAmARequestContextFactory requestContextFactory
);
}
52 changes: 8 additions & 44 deletions src/Paramore.Brighter.ServiceActivator/Dispatcher.cs
Original file line number Diff line number Diff line change
@@ -62,12 +62,7 @@ public class Dispatcher : IDispatcher
/// Gets the command processor.
/// </summary>
/// <value>The command processor.</value>
public IAmACommandProcessor CommandProcessor { get => CommandProcessorFactory.Invoke().Get(); }

/// <summary>
///
/// </summary>
public Func<IAmACommandProcessorProvider> CommandProcessorFactory { get; }
public IAmACommandProcessor CommandProcessor { get; private set; }

/// <summary>
/// Gets the connections.
@@ -97,7 +92,7 @@ public class Dispatcher : IDispatcher
/// <summary>
/// Initializes a new instance of the <see cref="Dispatcher"/> class.
/// </summary>
/// <param name="commandProcessorFactory">The command processor Factory.</param>
/// <param name="commandProcessor">The command processor we should use with the dispatcher (prefer to use Command Processor Provider for IoC Scope control</param>
/// <param name="subscriptions">The subscriptions.</param>
/// <param name="messageMapperRegistry">The message mapper registry.</param>
/// <param name="messageMapperRegistryAsync">Async message mapper registry.</param>
@@ -108,7 +103,7 @@ public class Dispatcher : IDispatcher
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
/// throws <see cref="ConfigurationException">You must provide at least one type of message mapper registry</see>
public Dispatcher(
Func<IAmACommandProcessorProvider> commandProcessorFactory,
IAmACommandProcessor commandProcessor,
IEnumerable<Subscription> subscriptions,
IAmAMessageMapperRegistry? messageMapperRegistry = null,
IAmAMessageMapperRegistryAsync? messageMapperRegistryAsync = null,
@@ -118,7 +113,7 @@ public Dispatcher(
IAmABrighterTracer? tracer = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
{
CommandProcessorFactory = commandProcessorFactory;
CommandProcessor = commandProcessor;

Subscriptions = subscriptions;
_messageMapperRegistry = messageMapperRegistry;
@@ -144,37 +139,6 @@ public Dispatcher(
State = DispatcherState.DS_AWAITING;
}

/// <summary>
/// Initializes a new instance of the <see cref="Dispatcher"/> class.
/// </summary>
/// <param name="commandProcessor">The command processor we should use with the dispatcher (prefer to use Command Processor Provider for IoC Scope control</param>
/// <param name="subscriptions">The subscriptions.</param>
/// <param name="messageMapperRegistry">The message mapper registry.</param>
/// <param name="messageMapperRegistryAsync">Async message mapper registry.</param>
/// <param name="messageTransformerFactory">Creates instances of Transforms</param>
/// <param name="messageTransformerFactoryAsync">Creates instances of Transforms async</param>
/// <param name="requestContextFactory">The factory used to make a request synchronizationHelper</param>
/// <param name="tracer">What is the <see cref="BrighterTracer"/> we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
/// throws <see cref="ConfigurationException">You must provide at least one type of message mapper registry</see>
public Dispatcher(
IAmACommandProcessor commandProcessor,
IEnumerable<Subscription> subscriptions,
IAmAMessageMapperRegistry? messageMapperRegistry = null,
IAmAMessageMapperRegistryAsync? messageMapperRegistryAsync = null,
IAmAMessageTransformerFactory? messageTransformerFactory = null,
IAmAMessageTransformerFactoryAsync? messageTransformerFactoryAsync= null,
IAmARequestContextFactory? requestContextFactory = null,
IAmABrighterTracer? tracer = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
: this(() =>
new CommandProcessorProvider(commandProcessor), subscriptions, messageMapperRegistry,
messageMapperRegistryAsync, messageTransformerFactory, messageTransformerFactoryAsync,
requestContextFactory, tracer, instrumentationOptions
)
{
}

/// <summary>
/// Stop listening to messages
/// </summary>
@@ -402,7 +366,7 @@ private Consumer CreateConsumer(Subscription subscription, int? consumerNumber)
{
var types = new[]
{
typeof(IAmACommandProcessorProvider), typeof(Subscription), typeof(IAmAMessageMapperRegistry),
typeof(IAmACommandProcessor), typeof(Subscription), typeof(IAmAMessageMapperRegistry),
typeof(IAmAMessageTransformerFactory), typeof(IAmARequestContextFactory), typeof(IAmABrighterTracer),
typeof(InstrumentationOptions)
};
@@ -414,7 +378,7 @@ private Consumer CreateConsumer(Subscription subscription, int? consumerNumber)

var consumerFactory = (IConsumerFactory)consumerFactoryCtor?.Invoke(new object?[]
{
CommandProcessorFactory.Invoke(), subscription, _messageMapperRegistry, _messageTransformerFactory,
CommandProcessor, subscription, _messageMapperRegistry, _messageTransformerFactory,
_requestContextFactory, _tracer, _instrumentationOptions

})!;
@@ -425,7 +389,7 @@ private Consumer CreateConsumer(Subscription subscription, int? consumerNumber)
{
var types = new[]
{
typeof(IAmACommandProcessorProvider),typeof(Subscription), typeof(IAmAMessageMapperRegistryAsync),
typeof(IAmACommandProcessor),typeof(Subscription), typeof(IAmAMessageMapperRegistryAsync),
typeof(IAmAMessageTransformerFactoryAsync), typeof(IAmARequestContextFactory), typeof(IAmABrighterTracer),
typeof(InstrumentationOptions)
};
@@ -437,7 +401,7 @@ private Consumer CreateConsumer(Subscription subscription, int? consumerNumber)

var consumerFactory = (IConsumerFactory)consumerFactoryCtor?.Invoke(new object?[]
{
CommandProcessorFactory.Invoke(), subscription, _messageMapperRegistryAsync, _messageTransformerFactoryAsync,
CommandProcessor, subscription, _messageMapperRegistryAsync, _messageTransformerFactoryAsync,
_requestContextFactory, _tracer, _instrumentationOptions
})!;

This file was deleted.

15 changes: 5 additions & 10 deletions src/Paramore.Brighter.ServiceActivator/MessagePump.cs
Original file line number Diff line number Diff line change
@@ -23,14 +23,9 @@ THE SOFTWARE. */
#endregion

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Actions;
using Paramore.Brighter.Logging;
using Paramore.Brighter.Observability;
using Polly.CircuitBreaker;

namespace Paramore.Brighter.ServiceActivator
{
@@ -52,7 +47,7 @@ public abstract class MessagePump<TRequest> where TRequest : class, IRequest
{
internal static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MessagePump<TRequest>>();

protected readonly IAmACommandProcessorProvider CommandProcessorProvider;
protected readonly IAmACommandProcessor CommandProcessor;
protected readonly IAmARequestContextFactory RequestContextFactory;
protected readonly IAmABrighterTracer? Tracer;
protected readonly InstrumentationOptions InstrumentationOptions;
@@ -101,18 +96,18 @@ public abstract class MessagePump<TRequest> where TRequest : class, IRequest
/// - Dispatches the message to waiting handlers
/// The message pump is a classic event loop and is intended to be run on a single-thread
/// </summary>
/// <param name="commandProcessorProvider">Provides a correctly scoped command processor </param>
/// <param name="commandProcessor">Provides a correctly scoped command processor </param>
/// <param name="requestContextFactory">Provides a request synchronizationHelper</param>
/// <param name="tracer">What is the <see cref="BrighterTracer"/> we will use for telemetry</param>
/// <param name="channel"></param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="Brighter.CommandProcessor"/> operations how noisy should the attributes be</param>
protected MessagePump(
IAmACommandProcessorProvider commandProcessorProvider,
IAmACommandProcessor commandProcessor,
IAmARequestContextFactory requestContextFactory,
IAmABrighterTracer? tracer,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
{
CommandProcessorProvider = commandProcessorProvider;
CommandProcessor = commandProcessor;
RequestContextFactory = requestContextFactory;
Tracer = tracer;
InstrumentationOptions = instrumentationOptions;
Original file line number Diff line number Diff line change
@@ -18,8 +18,9 @@ public ControlBusHandlerFactorySync(IDispatcher worker, Func<IAmACommandProcesso
/// Creates the specified handler type.
/// </summary>
/// <param name="handlerType">Type of the handler.</param>
/// <param name="lifetime">The Brighter Handler Lifetime</param>
/// <returns>IHandleRequests.</returns>
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
if (handlerType == typeof(ConfigurationCommandHandler))
return new ConfigurationCommandHandler(_worker);
@@ -34,7 +35,8 @@ public IHandleRequests Create(Type handlerType)
/// Releases the specified handler.
/// </summary>
/// <param name="handler">The handler.</param>
public void Release(IHandleRequests handler)
/// <param name="lifetime">The Brighter Handler Lifetime</param>
public void Release(IHandleRequests handler, IAmALifetime lifetime)
{
}
}
15 changes: 5 additions & 10 deletions src/Paramore.Brighter.ServiceActivator/Proactor.cs
Original file line number Diff line number Diff line change
@@ -46,22 +46,22 @@ public class Proactor<TRequest> : MessagePump<TRequest>, IAmAMessagePump where T
/// <summary>
/// Constructs a message pump
/// </summary>
/// <param name="commandProcessorProvider">Provides a way to grab a command processor correctly scoped</param>
/// <param name="commandProcessor">Provides a way to grab a command processor correctly scoped</param>
/// <param name="messageMapperRegistry">The registry of mappers</param>
/// <param name="messageTransformerFactory">The factory that lets us create instances of transforms</param>
/// <param name="requestContextFactory">A factory to create instances of request synchronizationHelper, used to add synchronizationHelper to a pipeline</param>
/// <param name="channel">The channel to read messages from</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
public Proactor(
IAmACommandProcessorProvider commandProcessorProvider,
IAmACommandProcessor commandProcessor,
IAmAMessageMapperRegistryAsync messageMapperRegistry,
IAmAMessageTransformerFactoryAsync messageTransformerFactory,
IAmARequestContextFactory requestContextFactory,
IAmAChannelAsync channel,
IAmABrighterTracer? tracer = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
: base(commandProcessorProvider, requestContextFactory, tracer, instrumentationOptions)
: base(commandProcessor, requestContextFactory, tracer, instrumentationOptions)
{
var transformPipelineBuilder = new TransformPipelineBuilderAsync(messageMapperRegistry, messageTransformerFactory);
_unwrapPipeline = transformPipelineBuilder.BuildUnwrapPipeline<TRequest>();
@@ -115,16 +115,14 @@ private async Task DispatchRequest(MessageHeader messageHeader, TRequest request
{
case MessageType.MT_COMMAND:
{
await CommandProcessorProvider
.Get()
await CommandProcessor
.SendAsync(request,requestContext, continueOnCapturedContext: true, default);
break;
}
case MessageType.MT_DOCUMENT:
case MessageType.MT_EVENT:
{
await CommandProcessorProvider
.Get()
await CommandProcessor
.PublishAsync(request, requestContext, continueOnCapturedContext: true, default);
break;
}
@@ -221,8 +219,6 @@ private async Task EventLoop()

var request = await TranslateMessage(message, context);

CommandProcessorProvider.CreateScope();

await DispatchRequest(message.Header, request, context);

span?.SetStatus(ActivityStatusCode.Ok);
@@ -303,7 +299,6 @@ private async Task EventLoop()
finally
{
Tracer?.EndSpan(span);
CommandProcessorProvider.ReleaseScope();
}

await Acknowledge(message);
13 changes: 5 additions & 8 deletions src/Paramore.Brighter.ServiceActivator/Reactor.cs
Original file line number Diff line number Diff line change
@@ -48,22 +48,22 @@ public class Reactor<TRequest> : MessagePump<TRequest>, IAmAMessagePump where TR
/// <summary>
/// Constructs a message pump
/// </summary>
/// <param name="commandProcessorProvider">Provides a way to grab a command processor correctly scoped</param>
/// <param name="commandProcessor">Provides a way to grab a command processor correctly scoped</param>
/// <param name="messageMapperRegistry">The registry of mappers</param>
/// <param name="messageTransformerFactory">The factory that lets us create instances of transforms</param>
/// <param name="requestContextFactory">A factory to create instances of request synchronizationHelper, used to add synchronizationHelper to a pipeline</param>
/// <param name="channel">The channel from which to read messages</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
public Reactor(
IAmACommandProcessorProvider commandProcessorProvider,
IAmACommandProcessor commandProcessor,
IAmAMessageMapperRegistry messageMapperRegistry,
IAmAMessageTransformerFactory messageTransformerFactory,
IAmARequestContextFactory requestContextFactory,
IAmAChannelSync channel,
IAmABrighterTracer? tracer = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
: base(commandProcessorProvider, requestContextFactory, tracer, instrumentationOptions)
: base(commandProcessor, requestContextFactory, tracer, instrumentationOptions)
{
var transformPipelineBuilder = new TransformPipelineBuilder(messageMapperRegistry, messageTransformerFactory);
_unwrapPipeline = transformPipelineBuilder.BuildUnwrapPipeline<TRequest>();
@@ -177,8 +177,6 @@ public void Run()

var request = TranslateMessage(message, context);

CommandProcessorProvider.CreateScope();

DispatchRequest(message.Header, request, context);

span?.SetStatus(ActivityStatusCode.Ok);
@@ -259,7 +257,6 @@ public void Run()
finally
{
Tracer?.EndSpan(span);
CommandProcessorProvider.ReleaseScope();
}

AcknowledgeMessage(message);
@@ -295,13 +292,13 @@ private void DispatchRequest(MessageHeader messageHeader, TRequest request, Requ
{
case MessageType.MT_COMMAND:
{
CommandProcessorProvider.Get().Send(request, requestContext);
CommandProcessor.Send(request, requestContext);
break;
}
case MessageType.MT_DOCUMENT:
case MessageType.MT_EVENT:
{
CommandProcessorProvider.Get().Publish(request, requestContext);
CommandProcessor.Publish(request, requestContext);
break;
}
}
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/AsyncHandlerFactory.cs
Original file line number Diff line number Diff line change
@@ -39,11 +39,11 @@ internal static class AsyncHandlerFactory
/// <param name="requestContext">The request context.</param>
/// <returns><see cref="IHandleRequestsAsync{TRequest}"/>.</returns>
/// <typeparam name="TRequest">The type of the <see cref="TRequest"/> request.</typeparam>
public static IHandleRequestsAsync<TRequest> CreateAsyncRequestHandler<TRequest>(this IAmAHandlerFactoryAsync factory, RequestHandlerAttribute attribute, IRequestContext requestContext)
public static IHandleRequestsAsync<TRequest> CreateAsyncRequestHandler<TRequest>(this IAmAHandlerFactoryAsync factory, RequestHandlerAttribute attribute, IRequestContext requestContext, IAmALifetime lifetime)
where TRequest : class, IRequest
{
var handlerType = attribute.GetHandlerType().MakeGenericType(typeof(TRequest));
var handler = (IHandleRequestsAsync<TRequest>)factory.Create(handlerType);
var handler = (IHandleRequestsAsync<TRequest>)factory.Create(handlerType, lifetime);

if (handler is null)
throw new ConfigurationException($"Could not create handler {handlerType} from {factory}");
14 changes: 9 additions & 5 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
@@ -292,7 +292,7 @@ public async Task SendAsync<T>(
var handlerChain = builder.BuildAsync(context, continueOnCapturedContext);

AssertValidSendPipeline(command, handlerChain.Count());

await handlerChain.First().HandleAsync(command, cancellationToken)
.ConfigureAwait(continueOnCapturedContext);
}
@@ -348,9 +348,11 @@ public void Publish<T>(T @event, RequestContext? requestContext = null) where T
{
var handlerName = handleRequests.Name.ToString();
handlerSpans[handlerName] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions)!;
context.Span = handlerSpans[handlerName];
if(handleRequests.Context is not null)
handleRequests.Context.Span = handlerSpans[handlerName];
handleRequests.Handle(@event);
context.Span = span;
if(handleRequests.Context is not null)
handleRequests.Context.Span = span;
}
catch (Exception e)
{
@@ -426,9 +428,11 @@ public async Task PublishAsync<T>(
foreach (var handleRequests in handlerChain)
{
handlerSpans[handleRequests.Name.ToString()] = _tracer?.CreateSpan(CommandProcessorSpanOperation.Publish, @event, span, options: _instrumentationOptions)!;
context.Span =handlerSpans[handleRequests.Name.ToString()];
if(handleRequests.Context is not null)
handleRequests.Context.Span = handlerSpans[handleRequests.Name.ToString()];
tasks.Add(handleRequests.HandleAsync(@event, cancellationToken));
context.Span = span;
if(handleRequests.Context is not null)
handleRequests.Context.Span = span;
}

await Task.WhenAll(tasks).ConfigureAwait(continueOnCapturedContext);
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/HandlerFactory.cs
Original file line number Diff line number Diff line change
@@ -41,10 +41,10 @@ public HandlerFactory(RequestHandlerAttribute attribute, IAmAHandlerFactorySync
_messageType = typeof(TRequest);
}

public IHandleRequests<TRequest> CreateRequestHandler()
public IHandleRequests<TRequest> CreateRequestHandler(IAmALifetime lifetime)
{
var handlerType = _attribute.GetHandlerType().MakeGenericType(_messageType);
var handler = (IHandleRequests<TRequest>)_factorySync.Create(handlerType);
var handler = (IHandleRequests<TRequest>)_factorySync.Create(handlerType, lifetime);

if (handler is null)
throw new ConfigurationException($"Could not create handler {handlerType} from {_factorySync}");
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/HandlerLifetimeScope.cs
Original file line number Diff line number Diff line change
@@ -76,14 +76,14 @@ public void Dispose()
_trackedObjects.Each((trackedItem) =>
{
//free disposable items
_handlerFactorySync?.Release(trackedItem);
_handlerFactorySync?.Release(trackedItem, this);
s_logger.LogDebug("Releasing handler instance {InstanceHashCode} of type {HandlerType}", trackedItem.GetHashCode(), trackedItem.GetType());
});

_trackedAsyncObjects.Each(trackedItem =>
{
//free disposable items
_asyncHandlerFactory?.Release(trackedItem);
_asyncHandlerFactory?.Release(trackedItem, this);
s_logger.LogDebug("Releasing async handler instance {InstanceHashCode} of type {HandlerType}", trackedItem.GetHashCode(), trackedItem.GetType());
});

7 changes: 5 additions & 2 deletions src/Paramore.Brighter/IAmAHandlerFactoryAsync.cs
Original file line number Diff line number Diff line change
@@ -39,12 +39,15 @@ public interface IAmAHandlerFactoryAsync : IAmAHandlerFactory
/// Creates the specified async handler type.
/// </summary>
/// <param name="handlerType">Type of the handler.</param>
/// <param name="lifetime">The brighter Handler lifetime</param>
/// <returns>IHandleRequestsAsync.</returns>
IHandleRequestsAsync Create(Type handlerType);
IHandleRequestsAsync Create(Type handlerType, IAmALifetime lifetime);

/// <summary>
/// Releases the specified async handler.
/// </summary>
/// <param name="handler">The handler.</param>
void Release(IHandleRequestsAsync? handler);
/// <param name="lifetime">The brighter Handler lifetime.</param>
void Release(IHandleRequestsAsync? handler, IAmALifetime lifetime);
}
}
8 changes: 6 additions & 2 deletions src/Paramore.Brighter/IAmAHandlerFactorySync.cs
Original file line number Diff line number Diff line change
@@ -39,12 +39,16 @@ public interface IAmAHandlerFactorySync : IAmAHandlerFactory
/// Creates the specified handler type.
/// </summary>
/// <param name="handlerType">Type of the handler.</param>
/// <param name="lifetime">The Brighter handler Lifetime</param>
/// <returns>IHandleRequests.</returns>
IHandleRequests Create(Type handlerType);
IHandleRequests Create(Type handlerType, IAmALifetime lifetime);

/// <summary>
/// Releases the specified handler.
/// </summary>
/// <param name="handler">The handler.</param>
void Release(IHandleRequests handler);
/// <param name="lifetime">The Brighter handler Lifetime</param>
void Release(IHandleRequests handler, IAmALifetime lifetime);

}
}
2 changes: 1 addition & 1 deletion src/Paramore.Brighter/IAmAPipelineBuilder.cs
Original file line number Diff line number Diff line change
@@ -42,4 +42,4 @@ internal interface IAmAPipelineBuilder<TRequest> : IDisposable where TRequest :
/// <returns>Pipelines&lt;TRequest&gt;.</returns>
Pipelines<TRequest> Build(IRequestContext requestContext);
}
}
}
6 changes: 6 additions & 0 deletions src/Paramore.Brighter/IRequestContext.cs
Original file line number Diff line number Diff line change
@@ -58,5 +58,11 @@ public interface IRequestContext
/// Gets the Feature Switches
/// </summary>
IAmAFeatureSwitchRegistry? FeatureSwitches { get; }

/// <summary>
/// Create a new copy of the Request Context
/// </summary>
/// <returns>a new copy of the request context</returns>
IRequestContext CreateCopy();
}
}
67 changes: 0 additions & 67 deletions src/Paramore.Brighter/Interpreter.cs

This file was deleted.

153 changes: 107 additions & 46 deletions src/Paramore.Brighter/PipelineBuilder.cs

Large diffs are not rendered by default.

22 changes: 21 additions & 1 deletion src/Paramore.Brighter/RequestContext.cs
Original file line number Diff line number Diff line change
@@ -41,6 +41,13 @@ public class RequestContext : IRequestContext
{
private readonly ConcurrentDictionary<int, Activity> _spans = new();

public RequestContext() { }

private RequestContext(ConcurrentDictionary<string, object> bag)
{
Bag = bag;
}

/// <summary>
/// Gets the bag.
/// </summary>
@@ -51,7 +58,20 @@ public class RequestContext : IRequestContext
/// Gets the Feature Switches
/// </summary>
public IAmAFeatureSwitchRegistry? FeatureSwitches { get; set; }


/// <summary>
/// Create a new instance of the Request Context
/// </summary>
/// <returns>New Instance of the message</returns>
public IRequestContext CreateCopy()
=> new RequestContext(Bag)
{
Span = Span,
Policies = Policies,
FeatureSwitches = FeatureSwitches,
OriginatingMessage = OriginatingMessage
};

/// <summary>
/// When we pass a requestContext through a receiver pipeline, we may want to pass the original message that started the pipeline.
/// This is primarily useful for debugging - how did we get to this request?. But it is also useful for some request metadata that we
6 changes: 4 additions & 2 deletions src/Paramore.Brighter/SimpleHandlerFactoryAsync.cs
Original file line number Diff line number Diff line change
@@ -36,8 +36,9 @@ public class SimpleHandlerFactoryAsync(Func<Type, IHandleRequestsAsync> factoryM
/// Create a handler for a given request type
/// </summary>
/// <param name="handlerType">The type of request</param>
/// <param name="lifetime">The Brighter Handler Lifetime</param>
/// <returns></returns>
public IHandleRequestsAsync Create(Type handlerType)
public IHandleRequestsAsync Create(Type handlerType, IAmALifetime lifetime)
{
return factoryMethod(handlerType);
}
@@ -46,7 +47,8 @@ public IHandleRequestsAsync Create(Type handlerType)
/// Dispose of the handler
/// </summary>
/// <param name="handler">The handler to dispose</param>
public void Release(IHandleRequestsAsync? handler)
/// <param name="lifetime">The Brighter Handler Lifetime</param>
public void Release(IHandleRequestsAsync? handler, IAmALifetime lifetime)
{
if (handler is IDisposable disposable)
{
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/SimpleHandlerFactorySync.cs
Original file line number Diff line number Diff line change
@@ -32,12 +32,12 @@ namespace Paramore.Brighter
/// </summary>
public class SimpleHandlerFactorySync(Func<Type, IHandleRequests> factoryMethod) : IAmAHandlerFactorySync
{
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
return factoryMethod(handlerType);
}

public void Release(IHandleRequests handler)
public void Release(IHandleRequests handler, IAmALifetime lifetime)
{
var disposable = handler as IDisposable;
disposable?.Dispose();
Original file line number Diff line number Diff line change
@@ -98,7 +98,6 @@ public SnsReDrivePolicySDlqTests()
requestContextFactory: new InMemoryRequestContextFactory(),
policyRegistry: new PolicyRegistry()
);
var provider = new CommandProcessorProvider(commandProcessor);

var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new MyDeferredCommandMessageMapper()),
@@ -107,7 +106,7 @@ public SnsReDrivePolicySDlqTests()
messageMapperRegistry.Register<MyDeferredCommand, MyDeferredCommandMessageMapper>();

//pump messages from a channel to a handler - in essence we are building our own dispatcher in this test
_messagePump = new Reactor<MyDeferredCommand>(provider, messageMapperRegistry,
_messagePump = new Reactor<MyDeferredCommand>(commandProcessor, messageMapperRegistry,
null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3
Original file line number Diff line number Diff line change
@@ -84,15 +84,14 @@ public SnsReDrivePolicySDlqTestsAsync()
requestContextFactory: new InMemoryRequestContextFactory(),
policyRegistry: new PolicyRegistry()
);
var provider = new CommandProcessorProvider(commandProcessor);

var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new MyDeferredCommandMessageMapper()),
null
);
messageMapperRegistry.Register<MyDeferredCommand, MyDeferredCommandMessageMapper>();

_messagePump = new Proactor<MyDeferredCommand>(provider, messageMapperRegistry,
_messagePump = new Proactor<MyDeferredCommand>(commandProcessor, messageMapperRegistry,
new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3
Original file line number Diff line number Diff line change
@@ -10,11 +10,11 @@ public QuickHandlerFactory(Func<IHandleRequests> handlerAction)
{
_handlerAction = handlerAction;
}
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
return _handlerAction();
}

public void Release(IHandleRequests handler) { }
public void Release(IHandleRequests handler, IAmALifetime lifetime) { }
}
}
Original file line number Diff line number Diff line change
@@ -10,12 +10,12 @@ public QuickHandlerFactoryAsync(Func<IHandleRequestsAsync> handlerFactory)
_handlerFactory = handlerFactory;
}

public IHandleRequestsAsync Create(Type handlerType)
public IHandleRequestsAsync Create(Type handlerType, IAmALifetime lifetime)
{
return _handlerFactory();
}

public void Release(IHandleRequestsAsync handler)
public void Release(IHandleRequestsAsync handler, IAmALifetime lifetime)
{
// Implement any necessary cleanup logic here
}
Original file line number Diff line number Diff line change
@@ -105,14 +105,12 @@ public CommandProcessorCallTests()
public void When_Calling_A_Server_Via_The_Command_Processor()
{
//start a message pump on a new thread, to recieve the Call message
var provider = new CommandProcessorProvider(_commandProcessor);

Channel channel = new(
new("MyChannel"), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, TimeProvider.System, TimeSpan.FromMilliseconds(1000))
);

var messagePump = new Reactor<MyRequest>(provider, _messageMapperRegistry,
var messagePump = new Reactor<MyRequest>(_commandProcessor, _messageMapperRegistry,
new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000) };

Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ public void Dispose()

internal class CheapHandlerFactorySync : IAmAHandlerFactorySync
{
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
if (handlerType == typeof(MyPreAndPostDecoratedHandler))
{
@@ -65,7 +65,7 @@ public IHandleRequests Create(Type handlerType)
return null;
}

public void Release(IHandleRequests handler)
public void Release(IHandleRequests handler, IAmALifetime lifetime)
{
var disposable = handler as IDisposable;
disposable?.Dispose();
Original file line number Diff line number Diff line change
@@ -124,12 +124,12 @@ public void Dispose()

internal class EmptyHandlerFactorySync : IAmAHandlerFactorySync
{
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
return null;
}

public void Release(IHandleRequests handler) {}
public void Release(IHandleRequests handler, IAmALifetime lifetime) {}
}
}
}
Original file line number Diff line number Diff line change
@@ -109,12 +109,12 @@ public void Dispose()

internal class EmptyHandlerFactorySync : IAmAHandlerFactorySync
{
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
return null;
}

public void Release(IHandleRequests handler) {}
public void Release(IHandleRequests handler, IAmALifetime lifetime) {}
}
}
}
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ public void When_No_Request_Context_Is_Provided_On_A_Publish()
var commandProcessor = new CommandProcessor(registry, handlerFactory, _requestContextFactory, new PolicyRegistry());

//act
commandProcessor.Send(myEvent);
commandProcessor.Publish(myEvent);

//assert
_requestContextFactory.CreateWasCalled.Should().BeTrue();
@@ -108,7 +108,7 @@ public async Task When_No_Request_Context_Is_Provided_On_A_Publish_Async()
var commandProcessor = new CommandProcessor(registry, handlerFactory, _requestContextFactory, new PolicyRegistry());

//act
await commandProcessor.SendAsync(myEvent);
await commandProcessor.PublishAsync(myEvent);

//assert
_requestContextFactory.CreateWasCalled.Should().BeTrue();
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpRetryCommandOnConnectionFailureTestsAsync
public MessagePumpRetryCommandOnConnectionFailureTestsAsync()
{
_commandProcessor = new SpyCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
var channel = new FailingChannelAsync(
new ChannelName(ChannelName), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -58,7 +57,7 @@ public MessagePumpRetryCommandOnConnectionFailureTestsAsync()
null,
new SimpleMessageMapperFactoryAsync(_ => new MyCommandMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyCommand, MyCommandMessageMapperAsync>();
_messagePump = new Proactor<MyCommand>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyCommand>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(500), RequeueCount = -1
};
Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ public class MessagePumpRetryEventConnectionFailureTestsAsync
public MessagePumpRetryEventConnectionFailureTestsAsync()
{
_commandProcessor = new SpyCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
var channel = new FailingChannelAsync(
new ChannelName("myChannel"), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -59,7 +58,7 @@ public MessagePumpRetryEventConnectionFailureTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyEvent>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(500), RequeueCount = -1
};
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpCommandProcessingDeferMessageActionTestsAsync
public MessagePumpCommandProcessingDeferMessageActionTestsAsync()
{
SpyRequeueCommandProcessor commandProcessor = new();
var commandProcessorProvider = new CommandProcessorProvider(commandProcessor);

_channel = new ChannelAsync(new(ChannelName),_routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));

@@ -55,7 +54,7 @@ public MessagePumpCommandProcessingDeferMessageActionTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyCommandMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyCommand, MyCommandMessageMapperAsync>();

_messagePump = new Proactor<MyCommand>(commandProcessorProvider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyCommand>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount
};
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpCommandProcessingExceptionTestsAsync
public MessagePumpCommandProcessingExceptionTestsAsync()
{
SpyExceptionCommandProcessor commandProcessor = new();
var commandProcessorProvider = new CommandProcessorProvider(commandProcessor);

InternalBus bus = new();

@@ -57,7 +56,7 @@ public MessagePumpCommandProcessingExceptionTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyCommandMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyCommand, MyCommandMessageMapperAsync>();

_messagePump = new Proactor<MyCommand>(commandProcessorProvider, messageMapperRegistry,
_messagePump = new Proactor<MyCommand>(commandProcessor, messageMapperRegistry,
null, new InMemoryRequestContextFactory(), _channel
)
{
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@ public class MessagePumpUnacceptableMessageLimitTestsAsync
public MessagePumpUnacceptableMessageLimitTestsAsync()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);
var channel = new ChannelAsync(
new (Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -53,7 +52,7 @@ public MessagePumpUnacceptableMessageLimitTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new FailingEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyFailingMapperEvent, FailingEventMessageMapperAsync>();

_messagePump = new Proactor<MyFailingMapperEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyFailingMapperEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3, UnacceptableMessageLimit = 3
};
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ public class MessagePumpFailingMessageTranslationTestsAsync
public MessagePumpFailingMessageTranslationTestsAsync()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);
_channel = new ChannelAsync(
new(ChannelName), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000))
@@ -30,7 +29,7 @@ public MessagePumpFailingMessageTranslationTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new FailingEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyFailingMapperEvent, FailingEventMessageMapperAsync>();

_messagePump = new Proactor<MyFailingMapperEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyFailingMapperEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3, UnacceptableMessageLimit = 3
};
Original file line number Diff line number Diff line change
@@ -29,8 +29,6 @@ public MessagePumpDispatchAsyncTests()
handlerFactory,
new InMemoryRequestContextFactory(),
new PolicyRegistry());

var commandProcessorProvider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -40,7 +38,7 @@ public MessagePumpDispatchAsyncTests()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(commandProcessorProvider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyEvent>(commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000) };

var message = new Message(new MessageHeader(Guid.NewGuid().ToString(), _routingKey, MessageType.MT_EVENT), new MessageBody(JsonSerializer.Serialize(_myEvent)));
Original file line number Diff line number Diff line change
@@ -47,15 +47,14 @@ public class MessagePumpCommandRequeueCountThresholdTestsAsync
public MessagePumpCommandRequeueCountThresholdTestsAsync()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
_channel = new ChannelAsync(new(Channel) ,_routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));

var messageMapperRegistry = new MessageMapperRegistry(
null,
new SimpleMessageMapperFactoryAsync(_ => new MyCommandMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyCommand, MyCommandMessageMapperAsync>();

_messagePump = new Proactor<MyCommand>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyCommand>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
{ Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3 };

var message1 = new Message(new MessageHeader(Guid.NewGuid().ToString(), _routingKey, MessageType.MT_COMMAND),
Original file line number Diff line number Diff line change
@@ -47,15 +47,14 @@ public class MessagePumpEventRequeueCountThresholdTestsAsync
public MessagePumpEventRequeueCountThresholdTestsAsync()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
_channel = new ChannelAsync(new(Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));

var messageMapperRegistry = new MessageMapperRegistry(
null,
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyEvent>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
{ Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3 };

var message1 = new Message(
Original file line number Diff line number Diff line change
@@ -47,15 +47,14 @@ public class MessagePumpCommandRequeueTestsAsync
public MessagePumpCommandRequeueTestsAsync()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
ChannelAsync channel = new(new(Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)), 2);

var messageMapperRegistry = new MessageMapperRegistry(
null,
new SimpleMessageMapperFactoryAsync(_ => new MyCommandMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyCommand, MyCommandMessageMapperAsync>();

_messagePump = new Proactor<MyCommand>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyCommand>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = -1 };

var message1 = new Message(
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpEventRequeueTestsAsync
public MessagePumpEventRequeueTestsAsync()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
ChannelAsync channel = new(
new(Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -58,7 +57,7 @@ public MessagePumpEventRequeueTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyEvent>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = -1 };

var message1 = new Message(
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpEventProcessingDeferMessageActionTestsAsync
public MessagePumpEventProcessingDeferMessageActionTestsAsync()
{
SpyRequeueCommandProcessor commandProcessor = new();
var commandProcessorProvider = new CommandProcessorProvider(commandProcessor);

_bus = new InternalBus();
_channel = new ChannelAsync(new (Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));
@@ -56,7 +55,7 @@ public MessagePumpEventProcessingDeferMessageActionTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(commandProcessorProvider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyEvent>(commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
{ Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount };

var msg = new TransformPipelineBuilderAsync(messageMapperRegistry, null)
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@ public class MessagePumpEventProcessingExceptionTestsAsync
public MessagePumpEventProcessingExceptionTestsAsync()
{
SpyExceptionCommandProcessor commandProcessor = new();
var commandProcessorProvider = new CommandProcessorProvider(commandProcessor);

var bus = new InternalBus();

@@ -59,7 +58,7 @@ public MessagePumpEventProcessingExceptionTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(commandProcessorProvider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount
};
Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ public class AsyncMessagePumpUnacceptableMessageTests
public AsyncMessagePumpUnacceptableMessageTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

_bus = new InternalBus();

@@ -59,7 +58,7 @@ public AsyncMessagePumpUnacceptableMessageTests()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Proactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3
};
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@ public class MessagePumpUnacceptableMessageLimitBreachedAsyncTests
public MessagePumpUnacceptableMessageLimitBreachedAsyncTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

var channel = new ChannelAsync(new("MyChannel"), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)), 3);

@@ -51,7 +50,7 @@ public MessagePumpUnacceptableMessageLimitBreachedAsyncTests()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3, UnacceptableMessageLimit = 3
};
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpToCommandProcessorTestsAsync
public MessagePumpToCommandProcessorTestsAsync()
{
_commandProcessor = new SpyCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
ChannelAsync channel = new(
new(Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000))
@@ -56,7 +55,7 @@ public MessagePumpToCommandProcessorTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messagerMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

_messagePump = new Proactor<MyEvent>(provider, messagerMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Proactor<MyEvent>(_commandProcessor, messagerMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000) };

_event = new MyEvent();
Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ public class PerformerCanStopTestsAsync
public PerformerCanStopTestsAsync()
{
SpyCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);
ChannelAsync channel = new(
new(Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000))
@@ -56,7 +55,7 @@ public PerformerCanStopTestsAsync()
new SimpleMessageMapperFactoryAsync(_ => new MyEventMessageMapperAsync()));
messageMapperRegistry.RegisterAsync<MyEvent, MyEventMessageMapperAsync>();

var messagePump = new Proactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel);
var messagePump = new Proactor<MyEvent>(commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), channel);
messagePump.Channel = channel;
messagePump.TimeOut = TimeSpan.FromMilliseconds(5000);

Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpRetryCommandOnConnectionFailureTests
public MessagePumpRetryCommandOnConnectionFailureTests()
{
_commandProcessor = new SpyCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
var channel = new FailingChannel(
new ChannelName(ChannelName), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -58,7 +57,7 @@ public MessagePumpRetryCommandOnConnectionFailureTests()
new SimpleMessageMapperFactory(_ => new MyCommandMessageMapper()),
null);
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();
_messagePump = new Reactor<MyCommand>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyCommand>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(500), RequeueCount = -1
};
Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ public class MessagePumpRetryEventConnectionFailureTests
public MessagePumpRetryEventConnectionFailureTests()
{
_commandProcessor = new SpyCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
var channel = new FailingChannel(
new ChannelName("myChannel"), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -59,7 +58,7 @@ public MessagePumpRetryEventConnectionFailureTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyEvent>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(500), RequeueCount = -1
};
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ public class MessagePumpCommandProcessingDeferMessageActionTests
public MessagePumpCommandProcessingDeferMessageActionTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

_channel = new Channel(new("myChannel"), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));

@@ -53,7 +52,7 @@ public MessagePumpCommandProcessingDeferMessageActionTests()
null);
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();

_messagePump = new Reactor<MyCommand>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyCommand>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount
};
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpCommandProcessingExceptionTests
public MessagePumpCommandProcessingExceptionTests()
{
SpyExceptionCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

InternalBus bus = new();
_channel = new Channel(new("myChannel"),_routingKey, new InMemoryMessageConsumer(_routingKey, bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));
@@ -56,7 +55,7 @@ public MessagePumpCommandProcessingExceptionTests()
null);
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();

_messagePump = new Reactor<MyCommand>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyCommand>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount
};
Original file line number Diff line number Diff line change
@@ -20,15 +20,14 @@ public class MessagePumpFailingMessageTranslationTests
public MessagePumpFailingMessageTranslationTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);
_channel = new Channel(new(Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));
var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new FailingEventMessageMapper()),
null);
messageMapperRegistry.Register<MyFailingMapperEvent, FailingEventMessageMapper>();
var messageTransformerFactory = new SimpleMessageTransformerFactory(_ => throw new NotImplementedException());

_messagePump = new Reactor<MyFailingMapperEvent>(provider, messageMapperRegistry, messageTransformerFactory, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyFailingMapperEvent>(commandProcessor, messageMapperRegistry, messageTransformerFactory, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3, UnacceptableMessageLimit = 3
};
Original file line number Diff line number Diff line change
@@ -42,15 +42,14 @@ public class MessagePumpUnacceptableMessageLimitTests
public MessagePumpUnacceptableMessageLimitTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);
_timeProvider = new FakeTimeProvider();
Channel channel = new(new (Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));
var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new FailingEventMessageMapper()),
null);
messageMapperRegistry.Register<MyFailingMapperEvent, FailingEventMessageMapper>();

_messagePump = new Reactor<MyFailingMapperEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyFailingMapperEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3, UnacceptableMessageLimit = 3
};
Original file line number Diff line number Diff line change
@@ -55,8 +55,6 @@ public MessagePumpDispatchTests()
handlerFactory,
new InMemoryRequestContextFactory(),
new PolicyRegistry());

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -70,7 +68,7 @@ public MessagePumpDispatchTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000)
};
Original file line number Diff line number Diff line change
@@ -47,13 +47,12 @@ public class MessagePumpCommandRequeueCountThresholdTests
public MessagePumpCommandRequeueCountThresholdTests()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
_channel = new Channel(new(Channel) ,_routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));
var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new MyCommandMessageMapper()),
null);
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();
_messagePump = new Reactor<MyCommand>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyCommand>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), _channel)
{ Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3 };

var message1 = new Message(new MessageHeader(Guid.NewGuid().ToString(), _routingKey, MessageType.MT_COMMAND),
Original file line number Diff line number Diff line change
@@ -47,14 +47,13 @@ public class MessagePumpEventRequeueCountThresholdTests
public MessagePumpEventRequeueCountThresholdTests()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
_channel = new Channel(new(Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)));
var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new MyEventMessageMapper()),
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyEvent>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), _channel)
{ Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3 };

var message1 = new Message(
Original file line number Diff line number Diff line change
@@ -47,13 +47,12 @@ public class MessagePumpCommandRequeueTests
public MessagePumpCommandRequeueTests()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
Channel channel = new(new(Channel), _routingKey, new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)), 2);
var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory(_ => new MyCommandMessageMapper()),
null);
messageMapperRegistry.Register<MyCommand, MyCommandMessageMapper>();
_messagePump = new Reactor<MyCommand>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyCommand>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = -1 };

var message1 = new Message(
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpEventRequeueTests
public MessagePumpEventRequeueTests()
{
_commandProcessor = new SpyRequeueCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
Channel channel = new(
new(Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000)),
@@ -57,7 +56,7 @@ public MessagePumpEventRequeueTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyEvent>(_commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = -1 };

var message1 = new Message(
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@ public MessagePumpEventProcessingDeferMessageActionTests()
_routingKey = new RoutingKey(Topic);

SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

_bus = new InternalBus();

@@ -62,7 +61,7 @@ public MessagePumpEventProcessingDeferMessageActionTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount
};
Original file line number Diff line number Diff line change
@@ -49,7 +49,6 @@ public class MessagePumpEventProcessingExceptionTests
public MessagePumpEventProcessingExceptionTests()
{
SpyExceptionCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

_bus = new InternalBus();

@@ -63,7 +62,7 @@ public MessagePumpEventProcessingExceptionTests()
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

var requestContextFactory = new InMemoryRequestContextFactory();
_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null, requestContextFactory, _channel)
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null, requestContextFactory, _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = _requeueCount
};
Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ public class MessagePumpUnacceptableMessageTests
public MessagePumpUnacceptableMessageTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

_bus = new InternalBus();

@@ -56,7 +55,7 @@ public MessagePumpUnacceptableMessageTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3
};
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ public class MessagePumpUnacceptableMessageLimitBreachedTests
public MessagePumpUnacceptableMessageLimitBreachedTests()
{
SpyRequeueCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);

_bus = new InternalBus();

@@ -54,7 +53,7 @@ public MessagePumpUnacceptableMessageLimitBreachedTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null, new InMemoryRequestContextFactory(), channel)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3, UnacceptableMessageLimit = 3
};
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class MessagePumpToCommandProcessorTests
public MessagePumpToCommandProcessorTests()
{
_commandProcessor = new SpyCommandProcessor();
var provider = new CommandProcessorProvider(_commandProcessor);
Channel channel = new(
new(Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000))
@@ -57,7 +56,7 @@ public MessagePumpToCommandProcessorTests()
null);
messagerMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messagerMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
_messagePump = new Reactor<MyEvent>(_commandProcessor, messagerMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel)
{ Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000) };

_event = new MyEvent();
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ public class PerformerCanStopTests
public PerformerCanStopTests()
{
SpyCommandProcessor commandProcessor = new();
var provider = new CommandProcessorProvider(commandProcessor);
Channel channel = new(
new(Channel), _routingKey,
new InMemoryMessageConsumer(_routingKey, _bus, _timeProvider, TimeSpan.FromMilliseconds(1000))
@@ -57,7 +56,7 @@ public PerformerCanStopTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

var messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel);
var messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), channel);
messagePump.Channel = channel;
messagePump.TimeOut = TimeSpan.FromMilliseconds(5000);

Original file line number Diff line number Diff line change
@@ -82,8 +82,6 @@ public MessagePumpDispatchObservabilityTests()
new PolicyRegistry(),
tracer: tracer,
instrumentationOptions: instrumentationOptions);

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -98,7 +96,7 @@ public MessagePumpDispatchObservabilityTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null,
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null,
new InMemoryRequestContextFactory(), channel, tracer, instrumentationOptions)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000)
Original file line number Diff line number Diff line change
@@ -57,8 +57,6 @@ public MessagePumpEmptyQueueOberservabilityTests()
new PolicyRegistry(),
tracer: tracer,
instrumentationOptions: instrumentationOptions);

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -69,7 +67,7 @@ public MessagePumpEmptyQueueOberservabilityTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null,
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null,
new InMemoryRequestContextFactory(), channel, tracer, instrumentationOptions)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), EmptyChannelDelay = TimeSpan.FromMilliseconds(1000)
Original file line number Diff line number Diff line change
@@ -59,8 +59,6 @@ public MessagePumpBrokenCircuitChannelFailureOberservabilityTests()
new PolicyRegistry(),
tracer: tracer,
instrumentationOptions: instrumentationOptions);

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -76,7 +74,7 @@ public MessagePumpBrokenCircuitChannelFailureOberservabilityTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null,
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null,
new InMemoryRequestContextFactory(), channel, tracer, instrumentationOptions)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), EmptyChannelDelay = TimeSpan.FromMilliseconds(1000)
Original file line number Diff line number Diff line change
@@ -59,8 +59,6 @@ public MessagePumpChannelFailureOberservabilityTests()
new PolicyRegistry(),
tracer: tracer,
instrumentationOptions: instrumentationOptions);

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -75,7 +73,7 @@ public MessagePumpChannelFailureOberservabilityTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null,
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null,
new InMemoryRequestContextFactory(), channel, tracer, instrumentationOptions)
{
Channel = channel, TimeOut = TimeSpan.FromMilliseconds(5000), EmptyChannelDelay = TimeSpan.FromMilliseconds(1000)
Original file line number Diff line number Diff line change
@@ -56,8 +56,6 @@ public MessagePumpQuitOberservabilityTests()
new PolicyRegistry(),
tracer: tracer,
instrumentationOptions: instrumentationOptions);

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -71,7 +69,7 @@ public MessagePumpQuitOberservabilityTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null,
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null,
new InMemoryRequestContextFactory(), channel, tracer, instrumentationOptions)
{
Channel = channel, TimeOut= TimeSpan.FromMilliseconds(5000), EmptyChannelDelay = TimeSpan.FromMilliseconds(1000)
Original file line number Diff line number Diff line change
@@ -58,8 +58,6 @@ public MessagePumpUnacceptableMessageOberservabilityTests()
new PolicyRegistry(),
tracer: tracer,
instrumentationOptions: instrumentationOptions);

var provider = new CommandProcessorProvider(commandProcessor);

PipelineBuilder<MyEvent>.ClearPipelineCache();

@@ -70,7 +68,7 @@ public MessagePumpUnacceptableMessageOberservabilityTests()
null);
messageMapperRegistry.Register<MyEvent, MyEventMessageMapper>();

_messagePump = new Reactor<MyEvent>(provider, messageMapperRegistry, null,
_messagePump = new Reactor<MyEvent>(commandProcessor, messageMapperRegistry, null,
new InMemoryRequestContextFactory(), _channel, tracer, instrumentationOptions)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), EmptyChannelDelay = TimeSpan.FromMilliseconds(1000)
Original file line number Diff line number Diff line change
@@ -65,8 +65,7 @@ public DispatchBuilderTests()
.Build();

_builder = DispatchBuilder.StartNew()
.CommandProcessorFactory(() =>
new CommandProcessorProvider(commandProcessor),
.CommandProcessor(commandProcessor,
new InMemoryRequestContextFactory()
)
.MessageMappers(messageMapperRegistry, null, null, null)
Original file line number Diff line number Diff line change
@@ -65,8 +65,7 @@ public DispatchBuilderTestsAsync()
.Build();

_builder = DispatchBuilder.StartNew()
.CommandProcessorFactory(() =>
new CommandProcessorProvider(commandProcessor),
.CommandProcessor(commandProcessor,
new InMemoryRequestContextFactory()
)
.MessageMappers(null, messageMapperRegistry, null, new EmptyMessageTransformerFactoryAsync())
Original file line number Diff line number Diff line change
@@ -60,8 +60,7 @@ public DispatchBuilderWithNamedGateway()
.Build();

_builder = DispatchBuilder.StartNew()
.CommandProcessorFactory(() =>
new CommandProcessorProvider(commandProcessor),
.CommandProcessor(commandProcessor,
new InMemoryRequestContextFactory()
)
.MessageMappers(messageMapperRegistry, null, new EmptyMessageTransformerFactory(), null)
Original file line number Diff line number Diff line change
@@ -61,8 +61,7 @@ public DispatchBuilderWithNamedGatewayAsync()
.Build();

_builder = DispatchBuilder.StartNew()
.CommandProcessorFactory(() =>
new CommandProcessorProvider(commandProcessor),
.CommandProcessor(commandProcessor,
new InMemoryRequestContextFactory()
)
.MessageMappers(messageMapperRegistry, null, null, null)
Original file line number Diff line number Diff line change
@@ -89,7 +89,6 @@ public RMQMessageConsumerRetryDLQTests()
requestContextFactory: new InMemoryRequestContextFactory(),
policyRegistry: new PolicyRegistry()
);
var provider = new CommandProcessorProvider(commandProcessor);

//pump messages from a channel to a handler - in essence we are building our own dispatcher in this test
var messageMapperRegistry = new MessageMapperRegistry(
@@ -99,7 +98,7 @@ public RMQMessageConsumerRetryDLQTests()

messageMapperRegistry.Register<MyDeferredCommand, MyDeferredCommandMessageMapper>();

_messagePump = new Reactor<MyDeferredCommand>(provider, messageMapperRegistry,
_messagePump = new Reactor<MyDeferredCommand>(commandProcessor, messageMapperRegistry,
new EmptyMessageTransformerFactory(), new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 0
Original file line number Diff line number Diff line change
@@ -88,7 +88,6 @@ public RMQMessageConsumerRetryDLQTestsAsync()
requestContextFactory: new InMemoryRequestContextFactory(),
policyRegistry: new PolicyRegistry()
);
var provider = new CommandProcessorProvider(commandProcessor);

//pump messages from a channel to a handler - in essence we are building our own dispatcher in this test
var messageMapperRegistry = new MessageMapperRegistry(
@@ -98,7 +97,7 @@ public RMQMessageConsumerRetryDLQTestsAsync()

messageMapperRegistry.RegisterAsync<MyDeferredCommand, MyDeferredCommandMessageMapperAsync>();

_messagePump = new Proactor<MyDeferredCommand>(provider, messageMapperRegistry,
_messagePump = new Proactor<MyDeferredCommand>(commandProcessor, messageMapperRegistry,
new EmptyMessageTransformerFactoryAsync(), new InMemoryRequestContextFactory(), _channel)
{
Channel = _channel, TimeOut = TimeSpan.FromMilliseconds(5000), RequeueCount = 3
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ namespace Paramore.Brighter.RMQ.Tests.TestDoubles;

internal class QuickHandlerFactory(Func<IHandleRequests> handlerAction) : IAmAHandlerFactorySync
{
public IHandleRequests Create(Type handlerType)
public IHandleRequests Create(Type handlerType, IAmALifetime lifetime)
{
return handlerAction();
}

public void Release(IHandleRequests handler) { }
public void Release(IHandleRequests handler, IAmALifetime lifetime) { }
}
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ namespace Paramore.Brighter.RMQ.Tests.TestDoubles;

internal class QuickHandlerFactoryAsync(Func<IHandleRequestsAsync> handlerAction) : IAmAHandlerFactoryAsync
{
public IHandleRequestsAsync Create(Type handlerType)
public IHandleRequestsAsync Create(Type handlerType, IAmALifetime lifetime)
{
return handlerAction();
}

public void Release(IHandleRequestsAsync handler) { }
public void Release(IHandleRequestsAsync handler, IAmALifetime lifetime) { }
}

Unchanged files with check annotations Beta

public class MyEventMessageMapper : IAmAMessageMapper<MyEvent>
{
public IRequestContext Context { get; set; }

Check warning on line 8 in tests/Paramore.Brighter.InMemory.Tests/TestDoubles/MyEventMessageMapper.cs

GitHub Actions / build

Nullability of reference types in type of parameter 'value' of 'void MyEventMessageMapper.Context.set' doesn't match implicitly implemented member 'void IAmAMessageMapper<MyEvent>.Context.set' (possibly because of nullability attributes).
public Message MapToMessage(MyEvent request, Publication publication)
{
var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, source: publication.Source,
public class MyEventMessageMapperAsync : IAmAMessageMapperAsync<MyEvent>
{
public IRequestContext Context { get; set; }

Check warning on line 10 in tests/Paramore.Brighter.InMemory.Tests/TestDoubles/MyEventMessageMapperAsync.cs

GitHub Actions / build

Nullability of reference types in type of parameter 'value' of 'void MyEventMessageMapperAsync.Context.set' doesn't match implicitly implemented member 'void IAmAMessageMapperAsync<MyEvent>.Context.set' (possibly because of nullability attributes).

Check warning on line 10 in tests/Paramore.Brighter.InMemory.Tests/TestDoubles/MyEventMessageMapperAsync.cs

GitHub Actions / build

Non-nullable property 'Context' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
public Task<Message> MapToMessageAsync(MyEvent request, Publication publication, CancellationToken cancellationToken = default)
{
var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, messageType: request.RequestToMessageType(),

Check warning on line 13 in tests/Paramore.Brighter.InMemory.Tests/TestDoubles/MyEventMessageMapperAsync.cs

GitHub Actions / build

Possible null reference argument for parameter 'topic' in 'MessageHeader.MessageHeader(string messageId, RoutingKey topic, MessageType messageType, Uri? source = null, string? type = null, DateTimeOffset? timeStamp = null, string? correlationId = null, RoutingKey? replyTo = null, string contentType = "text/plain", string partitionKey = "", Uri? dataSchema = null, string? subject = null, int handledCount = 0, TimeSpan? delayed = null)'.
source: publication.Source, type: publication.Type, subject: publication.Subject);
var body = new MessageBody(JsonSerializer.Serialize(request, JsonSerialisationOptions.Options));
var message = new Message(header, body);
public Task<MyEvent> MapToRequestAsync(Message message, CancellationToken cancellationToken = default)
{
return Task.FromResult(JsonSerializer.Deserialize<MyEvent>(message.Body.Value, JsonSerialisationOptions.Options));

Check warning on line 22 in tests/Paramore.Brighter.InMemory.Tests/TestDoubles/MyEventMessageMapperAsync.cs

GitHub Actions / build

Nullability of reference types in value of type 'Task<MyEvent?>' doesn't match target type 'Task<MyEvent>'.
}
}
{
public static Exception Exception(Action action)
{
Exception exception = null;

Check warning on line 37 in tests/Paramore.Brighter.InMemory.Tests/Catch.cs

GitHub Actions / build

Converting null literal or possible null value to non-nullable type.
try
{
exception = e;
}
return exception;

Check warning on line 48 in tests/Paramore.Brighter.InMemory.Tests/Catch.cs

GitHub Actions / build

Possible null reference return.
}
public static async Task<Exception> ExceptionAsync(Func<Task> action)
{
Exception exception = null;

Check warning on line 52 in tests/Paramore.Brighter.InMemory.Tests/Catch.cs

GitHub Actions / build

Converting null literal or possible null value to non-nullable type.
try
{
{
public class MessageSpecification
{
public MessageHeader Header { get; set; }

Check warning on line 7 in tests/Paramore.Brighter.InMemory.Tests/Builders/MessageTestDataBuilder.cs

GitHub Actions / build

Non-nullable property 'Header' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
public MessageBody Body { get; set; }

Check warning on line 8 in tests/Paramore.Brighter.InMemory.Tests/Builders/MessageTestDataBuilder.cs

GitHub Actions / build

Non-nullable property 'Body' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
}
public class MessageTestDataBuilder