Skip to content

Commit

Permalink
fix: incorrect number of operationstartedcalls; make implementation t…
Browse files Browse the repository at this point in the history
…o AsyncEx closer for error trapping (#3452)
  • Loading branch information
iancooper authored Jan 2, 2025
1 parent d91839e commit 32c0605
Show file tree
Hide file tree
Showing 22 changed files with 383 additions and 312 deletions.
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Close()
/// Commit a transaction, performing all associated write actions
/// Will block thread and use second thread for callback
/// </summary>
public void Commit() => BrighterSynchronizationHelper.Run(async () => await DynamoDb.TransactWriteItemsAsync(_tx));
public void Commit() => BrighterAsyncContext.Run(async () => await DynamoDb.TransactWriteItemsAsync(_tx));

/// <summary>
/// Commit a transaction, performing all associated write actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public ChannelFactory(AWSMessagingGatewayConnection awsConnection)
/// <param name="subscription">An SqsSubscription, the subscription parameter to create the channel with.</param>
/// <returns>An instance of <see cref="IAmAChannelSync"/>.</returns>
/// <exception cref="ConfigurationException">Thrown when the subscription is not an SqsSubscription.</exception>
public IAmAChannelSync CreateSyncChannel(Subscription subscription) => BrighterSynchronizationHelper.Run(async () => await CreateSyncChannelAsync(subscription));
public IAmAChannelSync CreateSyncChannel(Subscription subscription) => BrighterAsyncContext.Run(async () => await CreateSyncChannelAsync(subscription));

/// <summary>
/// Creates the input channel.
Expand All @@ -88,7 +88,7 @@ public ChannelFactory(AWSMessagingGatewayConnection awsConnection)
/// <param name="subscription">An SqsSubscription, the subscription parameter to create the channel with.</param>
/// <returns>An instance of <see cref="IAmAChannelAsync"/>.</returns>
/// <exception cref="ConfigurationException">Thrown when the subscription is not an SqsSubscription.</exception>
public IAmAChannelAsync CreateAsyncChannel(Subscription subscription) => BrighterSynchronizationHelper.Run(async () => await CreateAsyncChannelAsync(subscription));
public IAmAChannelAsync CreateAsyncChannel(Subscription subscription) => BrighterAsyncContext.Run(async () => await CreateAsyncChannelAsync(subscription));

/// <summary>
/// Creates the input channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection,
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () => await AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterAsyncContext.Run(async () => await AcknowledgeAsync(message));

/// <summary>
/// Acknowledges the specified message.
Expand Down Expand Up @@ -112,7 +112,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection,
/// Sync over async
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));
public void Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));

/// <summary>
/// Rejects the specified message.
Expand Down Expand Up @@ -158,7 +158,7 @@ await client.ChangeMessageVisibilityAsync(
/// Purges the specified queue name.
/// Sync over Async
/// </summary>
public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());
public void Purge() => BrighterAsyncContext.Run(async () => await PurgeAsync());

/// <summary>
/// Purges the specified queue name.
Expand Down Expand Up @@ -187,7 +187,7 @@ await client.ChangeMessageVisibilityAsync(
/// Sync over async
/// </summary>
/// <param name="timeOut">The timeout. AWS uses whole seconds. Anything greater than 0 uses long-polling. </param>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterAsyncContext.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -255,7 +255,7 @@ await client.ChangeMessageVisibilityAsync(
}


public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay));
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterAsyncContext.Run(async () => await RequeueAsync(message, delay));

/// <summary>
/// Re-queues the specified message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ValueTask DisposeAsync()
return new ValueTask(Task.CompletedTask);
}

public bool ConfirmTopicExists(string? topic = null) => BrighterSynchronizationHelper.Run(async () => await ConfirmTopicExistsAsync(topic));
public bool ConfirmTopicExists(string? topic = null) => BrighterAsyncContext.Run(async () => await ConfirmTopicExistsAsync(topic));

public async Task<bool> ConfirmTopicExistsAsync(string? topic = null, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -132,7 +132,7 @@ public async Task SendAsync(Message message, CancellationToken cancellationToken
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Send(Message message) => BrighterSynchronizationHelper.Run(async () => await SendAsync(message));
public void Send(Message message) => BrighterAsyncContext.Run(async () => await SendAsync(message));

/// <summary>
/// Sends the specified message, with a delay.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public async ValueTask DisposeAsync()
/// Acknowledges the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async() => await AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterAsyncContext.Run(async() => await AcknowledgeAsync(message));

/// <summary>
/// Acknowledges the specified message.
Expand Down Expand Up @@ -157,7 +157,7 @@ public async ValueTask DisposeAsync()
/// </summary>
/// <param name="timeOut">The timeout for a message being available. Defaults to 300ms.</param>
/// <returns>Message.</returns>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterAsyncContext.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -230,7 +230,7 @@ public async ValueTask DisposeAsync()
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));
public void Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));

/// <summary>
/// Rejects the specified message.
Expand Down Expand Up @@ -268,7 +268,7 @@ public async ValueTask DisposeAsync()
/// <param name="message"></param>
/// <param name="delay">Delay to the delivery of the message. 0 is no delay. Defaults to 0.</param>
/// <returns>True if the message should be acked, false otherwise</returns>
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay));
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterAsyncContext.Run(async () => await RequeueAsync(message, delay));

/// <summary>
/// Requeues the specified message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ [EnumeratorCancellation] CancellationToken cancellationToken
/// </summary>
/// <param name="message">The message.</param>
/// <param name="delay">Delay to delivery of the message.</param>
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await SendWithDelayAsync(message, delay));
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterAsyncContext.Run(async () => await SendWithDelayAsync(message, delay));

/// <summary>
/// Send the specified message with specified delay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected override async Task GetMessageReceiverProviderAsync()
/// <summary>
/// Purges the specified queue name.
/// </summary>
public override void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());
public override void Purge() => BrighterAsyncContext.Run(async () => await PurgeAsync());

/// <summary>
/// Purges the specified queue name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public AzureServiceBusTopicConsumer(
/// <summary>
/// Purges the specified queue name.
/// </summary>
public override void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());
public override void Purge() => BrighterAsyncContext.Run(async () => await PurgeAsync());

/// <summary>
/// Purges the specified queue name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void EnsureTopic()
throw new ChannelFailureException($"Topic: {Topic.Value} does not exist");

if (!exists && MakeChannels == OnMissingChannel.Create)
BrighterSynchronizationHelper.Run(async () => await MakeTopic());
BrighterAsyncContext.Run(async () => await MakeTopic());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private async Task ConnectAsync()
/// Sync over async
/// </summary>
/// <param name="message">The message.</param>
public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(async () => await PublishMessageAsync(message));
public void PublishMessage(Message message) => BrighterAsyncContext.Run(async () => await PublishMessageAsync(message));

/// <summary>
/// Sends the specified message asynchronously.
Expand Down
10 changes: 5 additions & 5 deletions src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RmqMessageConsumer(
/// Acknowledges the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () =>await AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterAsyncContext.Run(async () =>await AcknowledgeAsync(message));

public async Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -170,7 +170,7 @@ public async Task AcknowledgeAsync(Message message, CancellationToken cancellati
/// <summary>
/// Purges the specified queue name.
/// </summary>
public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());
public void Purge() => BrighterAsyncContext.Run(async () => await PurgeAsync());

public async Task PurgeAsync(CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -215,7 +215,7 @@ public async Task PurgeAsync(CancellationToken cancellationToken = default)
/// <param name="timeOut">The timeout in milliseconds. We retry on timeout 5 ms intervals, with a min of 5ms
/// until the timeout value is reached. </param>
/// <returns>Message.</returns>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterAsyncContext.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -295,7 +295,7 @@ exception is NotSupportedException ||
/// <param name="message"></param>
/// <param name="timeout">Time to delay delivery of the message.</param>
/// <returns>True if message deleted, false otherwise</returns>
public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, timeout));
public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterAsyncContext.Run(async () => await RequeueAsync(message, timeout));

public async Task<bool> RequeueAsync(Message message, TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -346,7 +346,7 @@ public async Task<bool> RequeueAsync(Message message, TimeSpan? timeout = null,
/// Rejects the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));
public void Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));

public async Task RejectAsync(Message message, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class RmqMessageGatewayConnectionPool(string connectionName, ushort conne
/// </summary>
/// <param name="connectionFactory"></param>
/// <returns></returns>
public IConnection GetConnection(ConnectionFactory connectionFactory) => BrighterSynchronizationHelper.Run(() => GetConnectionAsync(connectionFactory));
public IConnection GetConnection(ConnectionFactory connectionFactory) => BrighterAsyncContext.Run(() => GetConnectionAsync(connectionFactory));

/// <summary>
/// Return matching RabbitMQ subscription if exist (match by amqp scheme)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public RmqMessageProducer(RmqMessagingGatewayConnection connection, RmqPublicati
/// <param name="message">The message.</param>
/// <param name="delay">Delay to delivery of the message.</param>
/// <returns>Task.</returns>
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await SendWithDelayAsync(message, delay));
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterAsyncContext.Run(async () => await SendWithDelayAsync(message, delay));

/// <summary>
/// Sends the specified message
Expand Down
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.ServiceActivator/Proactor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void Run()
{
//NOTE: Don't make this a method body, as opposed to an expression, unless you want it to
//break deep in AsyncTaskMethodBuilder for some hard to explain reasons
BrighterSynchronizationHelper.Run(async () => await EventLoop());
BrighterAsyncContext.Run(async () => await EventLoop());
}

private async Task Acknowledge(Message message)
Expand Down
Loading

0 comments on commit 32c0605

Please sign in to comment.