Closed
Description
Version Information
Version of Akka.NET? 1.5.40
Which Akka.NET Modules? Streams
Describe the bug
Cancelled sinks are blocking other BroadcastHub
consumers. This is the same exact BroadcastHub
problem as described here: akka/akka#23205 (comment)
To Reproduce
[Fact]
public async Task BroadcastHub_must_handle_cancelled_Sink()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var upstream = this.CreatePublisherProbe<int>();
var hubSource = Source.FromPublisher(upstream).RunWith(BroadcastHub.Sink<int>(4), Materializer);
var downstream = this.CreateSubscriberProbe<int>();
hubSource.RunWith(Sink.Cancelled<int>(), Materializer);
hubSource.RunWith(Sink.FromSubscriber(downstream), Materializer);
await downstream.EnsureSubscriptionAsync();
await downstream.RequestAsync(10);
await upstream.ExpectRequestAsync();
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);
await upstream.SendNextAsync(2);
await downstream.ExpectNextAsync(2);
await upstream.SendNextAsync(3);
await downstream.ExpectNextAsync(3);
await upstream.SendNextAsync(4);
await downstream.ExpectNextAsync(4);
await upstream.SendNextAsync(5);
await downstream.ExpectNextAsync(5);
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}, Materializer);
}
Expected behavior
downstream
should receive the 5th element
Actual behavior
Timeout while waiting for message exception on the await downstream.ExpectNextAsync(5);
line.