Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update method signatures, and minor fixes #3448

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Close()
/// Commit a transaction, performing all associated write actions
/// Will block thread and use second thread for callback
/// </summary>
public void Commit() => BrighterSynchronizationHelper.Run(() => DynamoDb.TransactWriteItemsAsync(_tx));
public void Commit() => BrighterSynchronizationHelper.Run(async () => await DynamoDb.TransactWriteItemsAsync(_tx));

/// <summary>
/// Commit a transaction, performing all associated write actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection,
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(() => AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () => await AcknowledgeAsync(message));

/// <summary>
/// Acknowledges the specified message.
Expand Down Expand Up @@ -112,7 +112,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection,
/// Sync over async
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(() => RejectAsync(message));
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));

/// <summary>
/// Rejects the specified message.
Expand Down Expand Up @@ -158,7 +158,7 @@ await client.ChangeMessageVisibilityAsync(
/// Purges the specified queue name.
/// Sync over Async
/// </summary>
public void Purge() => BrighterSynchronizationHelper.Run(() => PurgeAsync());
public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());

/// <summary>
/// Purges the specified queue name.
Expand Down Expand Up @@ -187,7 +187,7 @@ await client.ChangeMessageVisibilityAsync(
/// Sync over async
/// </summary>
/// <param name="timeOut">The timeout. AWS uses whole seconds. Anything greater than 0 uses long-polling. </param>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(() => ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -255,7 +255,7 @@ await client.ChangeMessageVisibilityAsync(
}


public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, delay));
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay));

/// <summary>
/// Re-queues the specified message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async Task SendAsync(Message message, CancellationToken cancellationToken
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Send(Message message) => BrighterSynchronizationHelper.Run(() => SendAsync(message));
public void Send(Message message) => BrighterSynchronizationHelper.Run(async () => await SendAsync(message));

/// <summary>
/// Sends the specified message, with a delay.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public async ValueTask DisposeAsync()
/// </summary>
/// <param name="timeOut">The timeout for a message being available. Defaults to 300ms.</param>
/// <returns>Message.</returns>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(() => ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -230,7 +230,7 @@ public async ValueTask DisposeAsync()
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(() => RejectAsync(message));
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));

/// <summary>
/// Rejects the specified message.
Expand Down Expand Up @@ -268,7 +268,7 @@ public async ValueTask DisposeAsync()
/// <param name="message"></param>
/// <param name="delay">Delay to the delivery of the message. 0 is no delay. Defaults to 0.</param>
/// <returns>True if the message should be acked, false otherwise</returns>
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, delay));
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay));

/// <summary>
/// Requeues the specified message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private async Task ConnectAsync()
/// Sync over async
/// </summary>
/// <param name="message">The message.</param>
public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(() => PublishMessageAsync(message));
public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(async () => await PublishMessageAsync(message));

/// <summary>
/// Sends the specified message asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RmqMessageConsumer(
/// Acknowledges the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(() =>AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () =>await AcknowledgeAsync(message));

public async Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -170,7 +170,7 @@ public async Task AcknowledgeAsync(Message message, CancellationToken cancellati
/// <summary>
/// Purges the specified queue name.
/// </summary>
public void Purge() => BrighterSynchronizationHelper.Run(() => PurgeAsync());
public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());

public async Task PurgeAsync(CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -295,7 +295,7 @@ exception is NotSupportedException ||
/// <param name="message"></param>
/// <param name="timeout">Time to delay delivery of the message.</param>
/// <returns>True if message deleted, false otherwise</returns>
public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, timeout));
public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, timeout));

public async Task<bool> RequeueAsync(Message message, TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public RmqMessageProducer(RmqMessagingGatewayConnection connection, RmqPublicati
/// <param name="message">The message.</param>
/// <param name="delay">Delay to delivery of the message.</param>
/// <returns>Task.</returns>
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => SendWithDelayAsync(message, delay));
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await SendWithDelayAsync(message, delay));

/// <summary>
/// Sends the specified message
Expand Down
51 changes: 32 additions & 19 deletions src/Paramore.Brighter/Tasks/BrighterSynchronizationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,11 @@ public override void Post(SendOrPostCallback callback, object? state)
//NOTE: if we got here, something went wrong, we should have been able to queue the message
//mostly this seems to be a problem with the task we are running completing, but work is still being queued to the
//synchronization context.
// If the execution context can help, we might be able to redirect; if not just run immediately on this thread

var contextCallback = new ContextCallback(callback);
if (ctxt != null && ctxt != _executionContext)
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {callback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
SynchronizationHelper.ExecuteOnContext(ctxt, contextCallback, state);
}
ExecuteOnCallersContext(contextCallback, state, ctxt);
else
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {callback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
//just execute inline
SynchronizationHelper.ExecuteImmediately(contextCallback, state);
}
ExecuteImmediately(contextCallback, state);
Debug.WriteLine(string.Empty);

}
Expand Down Expand Up @@ -192,5 +175,35 @@ public override void Send(SendOrPostCallback callback, object? state)
throw new TimeoutException("BrighterSynchronizationContext: Send operation timed out.");
}
}

private void ExecuteImmediately(ContextCallback contextCallback, object? state)
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.Fail("BrighterSynchronizationContext: ExecuteImmediately. We should never get here");
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {contextCallback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
//just execute inline
SynchronizationHelper.ExecuteImmediately(contextCallback, state);
}

/// <summary>
/// We should never get here as we should not be called from the wrong context
/// </summary>
/// <param name="contextCallback"></param>
/// <param name="state">Any state to pass to the callback</param>
/// <param name="ctxt"></param>
/// <param name="callback">The callback to execute</param>
private void ExecuteOnCallersContext(ContextCallback contextCallback, object? state, ExecutionContext ctxt)
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.Fail("BrighterSynchronizationContext: ExecuteOnCallersContext. We should never get here");
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {contextCallback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
SynchronizationHelper.ExecuteOnContext(ctxt, contextCallback, state);
}
Comment on lines +198 to +207

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: ExecuteImmediately,ExecuteOnCallersContext

Suppress

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,11 @@ public void Execute(Task parentTask)
foreach (var (task, propagateExceptions) in _taskQueue.GetConsumingEnumerable())
{
_taskScheduler.DoTryExecuteTask(task);
_activeTasks.TryRemove(task, out _);

if (!propagateExceptions) continue;

task.GetAwaiter().GetResult();
_activeTasks.TryRemove(task, out _);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,25 +306,27 @@ public void Task_AfterExecute_NeverRuns()

context.Execute(task);

try
{
var taskTwo = context.Factory.StartNew(
() => { value = 2; },
context.Factory.CancellationToken,
context.Factory.CreationOptions | TaskCreationOptions.DenyChildAttach,
context.TaskScheduler);
Comment on lines -309 to -315

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Code Duplication
introduced similar code in: Task_AfterExecute_NeverRuns,Task_AfterExecute_Runs_On_ThreadPool

Suppress

var taskTwo = context.Factory.StartNew(
() => { value = 2; },
context.Factory.CancellationToken,
context.Factory.CreationOptions | TaskCreationOptions.DenyChildAttach,
context.TaskScheduler);

taskTwo.ContinueWith(_ => { throw new Exception("Should not run"); }, TaskScheduler.Default);
taskTwo.ContinueWith(_ => { throw new Exception("Should not run"); }, TaskScheduler.Default);

bool exceptionRan = false;
try
{
context.Execute(taskTwo);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
exceptionRan = true;
}
//there should be no pending work

value.Should().Be(1);
exceptionRan.Should().BeFalse();
}

[Fact]
Expand Down
Loading