Recently I stumbled upon an interesting statement by Enigmativity about the Publish
and RefCount
operators:
You're using the dangerous .Publish().RefCount() operator pair which creates a sequence that can't be subscribed to after it completes.
This statement seems to oppose Lee Campbell's assessment about these operators. Quoting from his book Intro to Rx:
The Publish/RefCount pair is extremely useful for taking a cold observable and sharing it as a hot observable sequence for subsequent observers.
Initially I didn't believe that Enigmativity's statement was correct, so I tried to refute it. My experiments revealed that the Publish().RefCount()
can be
indeed inconsistent. Subscribing a second time to a published sequence can cause a new subscription to the source sequence, or not, depending on whether the source sequence was completed while connected. If it was completed, then it won't be resubscribed. If it was not completed, then it will be resubscribed. Here is a demonstration of this behavior:
var observable = Observable
.Create<int>(o =>
{
o.OnNext(13);
o.OnCompleted(); // Commenting this line alters the observed behavior
return Disposable.Empty;
})
.Do(x => Console.WriteLine($"Producer generated: {x}"))
.Finally(() => Console.WriteLine($"Producer finished"))
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"Consumer received #{x}"))
.Finally(() => Console.WriteLine($"Consumer finished"));
observable.Subscribe().Dispose();
observable.Subscribe().Dispose();
In this example the observable
is composed by three parts. First is the producing part that generates a single value and then completes. Then follows the publishing mechanism (Publish
+RefCount
). And finally comes the consuming part that observes the values emitted by the producer. The observable
is subscribed twice. The expected behavior would be that each subscription will receive one value. But this is not what happens! Here is the output:
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished
And here is the output if we comment the o.OnCompleted();
line. This subtle change results to a behavior that is expected and desirable:
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
In the first case the cold producer (the part before the Publish().RefCount()
) was subscribed only once. The first consumer received the emitted value, but the second consumer received nothing (except from an OnCompleted
notification). In the second case the producer was subscribed twice. Each time it generated a value, and each consumer got one value.
My question is: how can we fix this? How can we modify either the Publish
operator, or the RefCount
, or both, in order to make them behave always consistently and desirably? Below are the specifications of the desirable behavior:
- The published sequence should propagate to its subscribers all notifications coming directly from the source sequence, and nothing else.
- The published sequence should subscribe to the source sequence when its current number of subscribers increases from zero to one.
- The published sequence should stay connected to the source as long as it has at least one subscriber.
- The published sequence should unsubscribe from the source when its current number of subscribers becomes zero.
I am asking for either a custom PublishRefCount
operator that offers the functionality described above, or for a way to achieve the desirable functionality using the built-in operators.
Btw a similar question exists, that asks why this happens. My question is about how to fix it.
Update: In retrospect, the above specification results to an unstable behavior that makes race-conditions unavoidable. There is no guarantee that two subscriptions to the published sequence will result to a single subscription to the source sequence. The source sequence may complete between the two subscriptions, causing the unsubscription of the first subscriber, causing the unsubscription of the RefCount
operator, causing a new subscription to the source for the next subscriber. The behavior of the built-in .Publish().RefCount()
prevents this from happening.
The moral lesson is that the .Publish().RefCount()
sequence is not broken, but it's not reusable. It cannot be used reliably for multiple connect/disconnect sessions. If you want a second session, you should create a new .Publish().RefCount()
sequence.