Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: RefCount with MinObservers > 1 Behaves Unexpectedly #2173

Open
mikeronayne opened this issue Oct 11, 2024 · 1 comment
Open

BUG: RefCount with MinObservers > 1 Behaves Unexpectedly #2173

mikeronayne opened this issue Oct 11, 2024 · 1 comment

Comments

@mikeronayne
Copy link

Code

Subject<int> subj = new();

var refCount = subj.Publish().RefCount(minObservers: 2);

subj.OnNext(1);

var sub1 = refCount.Subscribe(c => Console.WriteLine($"sub1: {c}"));

subj.OnNext(2);

var sub2 = refCount.Subscribe(c => Console.WriteLine($"sub2: {c}"));

subj.OnNext(3);

sub1.Dispose();

subj.OnNext(4);

var sub3 = refCount.Subscribe(c => Console.WriteLine($"sub3: {c}"));

subj.OnNext(5);

var sub4 = refCount.Subscribe(c => Console.WriteLine($"sub4: {c}"));

subj.OnNext(6);

sub2.Dispose();

sub3.Dispose();

sub4.Dispose();

Expected Output

I would expect not to receive values until sub2 has subscribed. Then I would expect not to receive values after sub1 is disposed. Then I would expect to receive values again after sub3 has subscribed.

sub1: 3
sub2: 3
sub2: 5
sub3: 5
sub2: 6
sub3: 6
sub4: 6

Actual Output

  1. sub2 should not have received value 4 since we were below minObservers at that point.
  2. Definitely shouldn't have gotten an exception on sub3 subscribing.
sub1: 3
sub2: 3
sub2: 4

InvalidOperationException: Disposable has already been assigned.
at System.Reactive.Disposables.SingleAssignmentDisposableValue.set_Disposable(IDisposable value) 
at System.Reactive.Linq.ObservableImpl.RefCount`1.Eager._.Run()   at System.Reactive.Linq.ObservableImpl.RefCount`1.Eager.Run(_ sink) 
at System.Reactive.Producer`2.<>c.<SubscribeRaw>b__1_0(ValueTuple`2 tuple) 
at System.Reactive.Concurrency.Scheduler.<>c__75`1.<ScheduleAction>b__75_0(IScheduler _, ValueTuple`2 tuple)
at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
at System.Reactive.Concurrency.Scheduler.ScheduleAction[TState](IScheduler scheduler, TState state, Action`1 action)
at System.Reactive.Producer`2.SubscribeRaw(IObserver`1 observer, Boolean enableSafeguard)
at System.Reactive.Producer`2.Subscribe(IObserver`1 observer)
at System.ObservableExtensions.Subscribe[T](IObservable`1 source, Action`1 onNext)   at UserQuery.Main(), line 21

If this IS expected and not a bug, would love an explanation of what I'm misunderstanding. Thank you!

@SlyCedix
Copy link

SlyCedix commented Mar 20, 2025

The documentation on RefCount(IConnectableObservable source, int minObservers) (as well as all RefCount overloads) states that the observable stays connected as long as there is one subscription to the source. This may be an error, but the functionality is consistent with documentation. Given that fact, the expected value would be as follows:

sub1: 3
sub2: 3
sub2: 4
sub2: 5
sub3: 5
sub2: 6
sub3: 6
sub4: 6

The exception looks like it's due to a bug here:

doConnect = ++conn._count == _parent._minObservers;
// save the current connection for this observer
_targetConnection = conn;
}
// subscribe to the source first
Run(_parent._source);
// then connect the source if necessary
if (doConnect && !conn._disposable.IsDisposed)
{
// this makes sure if the connection ends synchronously
// only the currently known connection is affected
// and a connection from a concurrent reconnection won't
// interfere
conn._disposable.Disposable = _parent._source.Connect();
}

The error happens as follows:

Subject<int> subj = new();
var refCount = subj.Publish().RefCount(minObservers: 2);

var sub1 = refCount.Subscribe(c => Console.WriteLine($"sub1: {c}")); // count == 1, doConnect = false
var sub2 = refCount.Subscribe(c => Console.WriteLine($"sub2: {c}")); // count == 2, doConnect = true, sets SingleAssignmentDisposableValue
sub1.Dispose(); // count == 1, no check to disconnect
var sub3 = refCount.Subscribe(c => Console.WriteLine($"sub3: {c}")); // count == 2, doConnect = true, sets already assigned SingleAssignmentDisposableValue, throwing an error

The reason this doesn't happen when minObservers == 1 is that it's impossible to fall below the threshold without the observable disconnecting due to the check for disconnection being as follows:

// if the current connection is no longer the saved connection
// or the counter hasn't reached zero yet
if (targetConnection != _parent._connection
|| --targetConnection._count != 0)
{
// nothing to do.
return;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants