Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solve a poetntial deadlock #24

Merged
merged 3 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions src/Interprocess.Tests/QueueTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -238,11 +239,47 @@ public void CanRejectLargeMessages()
p.TryEnqueue(ByteArray50).Should().BeFalse(); // failed here
}

[Fact]
[TestBeforeAfter]
public void CanRecoverIfPublisherCrashes()
{
// This is very complicated test that is trying to replicate a crash scenario when the publisher
// crashes after indicating that it is writing the message but before completing the operation.

using var dp = new DeadlockCausingPublisher(new("qn", fixture.Path, 1024), NullLoggerFactory.Instance);
dp.TryEnqueue(ByteArray3).Should().BeTrue();

using var p = CreatePublisher(1024);
p.TryEnqueue(ByteArray1).Should().BeTrue();
using var s = CreateSubscriber(1024);

// This line should take 10 seconds to return (that is how long the timeout is set in the code)
// After the 10 seconds expires, we should have lost all other messages that were in the queue when we started the dequeue process.
s.TryDequeue(default, out _).Should().BeFalse();

// But then, after this 10 seconds delay, system should fully recover and continue with new messages
p.TryEnqueue(ByteArray1).Should().BeTrue();
s.TryDequeue(default, out var message).Should().BeTrue();
message.ToArray().Should().BeEquivalentTo(ByteArray1);
}

private IPublisher CreatePublisher(long capacity) =>
queueFactory.CreatePublisher(
new QueueOptions("qn", fixture.Path, capacity));
queueFactory.CreatePublisher(new("qn", fixture.Path, capacity));

private ISubscriber CreateSubscriber(long capacity) =>
queueFactory.CreateSubscriber(
new QueueOptions("qn", fixture.Path, capacity));
queueFactory.CreateSubscriber(new("qn", fixture.Path, capacity));

private sealed class DeadlockCausingPublisher(QueueOptions options, ILoggerFactory loggerFactory) :
Queue(options, loggerFactory),
IPublisher
{
public unsafe bool TryEnqueue(ReadOnlySpan<byte> message)
{
var bodyLength = message.Length;
var messageLength = GetPaddedMessageLength(bodyLength);
var header = *Header;
Header->WriteOffset = SafeIncrementMessageOffset(header.WriteOffset, messageLength);
return true;
}
}
}
31 changes: 23 additions & 8 deletions src/Interprocess/Queue/Subscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ private unsafe bool TryDequeueImpl(
return false;

var readLockTimestamp = header.ReadLockTimestamp;
var now = DateTime.UtcNow.Ticks;
var start = DateTime.UtcNow.Ticks;

// is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed?
if (now - readLockTimestamp < TicksForTenSeconds)
if (start - readLockTimestamp < TicksForTenSeconds)
return false;

// take a read-lock so no other thread can read a message
if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) != readLockTimestamp)
if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, start, readLockTimestamp) != readLockTimestamp)
return false;

try
Expand All @@ -125,14 +125,29 @@ private unsafe bool TryDequeueImpl(

// now finally have a read-lock and the queue is not empty
var readOffset = Header->ReadOffset;
var writeOffset = Header->WriteOffset;
var messageHeader = (MessageHeader*)Buffer.GetPointer(readOffset);

// was this message fully written by the publisher? if not, wait for the publisher to finish writing it
while (Interlocked.CompareExchange(
ref messageHeader->State,
MessageHeader.LockedToBeConsumedState,
MessageHeader.ReadyToBeConsumedState) != MessageHeader.ReadyToBeConsumedState)
while (true)
{
// was this message fully written by the publisher? if not, wait for the publisher to finish writing it
var state = Interlocked.CompareExchange(
ref messageHeader->State,
MessageHeader.LockedToBeConsumedState,
MessageHeader.ReadyToBeConsumedState);

if (state == MessageHeader.ReadyToBeConsumedState)
break;

// but if the publisher crashed, we will never get the message, so we need to handle that case by timing out
if (DateTime.UtcNow.Ticks - start > TicksForTenSeconds)
{
// the publisher crashed and we will never get the message
// so we need to release the read-lock and advance the queue for everyone.
// some messages might be lost in this case but this is the best we can do.
Interlocked.Exchange(ref Header->ReadOffset, writeOffset);
return false;
}
Thread.Yield();
}

Expand Down
Loading