public class OnSubscribeDeferAndWatch<T> extends Object implements Observable.OnSubscribe<T>
Defers the execution of a Subject
and in addition watches for early unsubscription and cleans up buffers if the content is ReferenceCounted
.
Implementation Details:
This implementation is very similar to Observable.defer(Func0)
in that it takes a hot observable like a subject and defers the execution of it until someone subscribes. The problem with vanilla defer is that if an early unsubscribe happens (like a downstream timeout firing) the message from the hot observable is not properly consumed anymore which can lead to buffer leaks if it contains a pooled resource.
To mitigate this, another subscription is added to the hot observable which checks, at the time of item emission, that the subscription is still present. If it is not the buffers are proactively cleared out, making sure that no trace/leak is left behind.
♬ Wir hom so vü zum tuan ♬ Wir hudln und schurdln ummanond ♬ Müssen uns sputen des dauert vü zu lang ♬ Da hüft ka hupen so kummst a ned schneller dran ♬ Und a ka Fluchen lametier ned gemmas on – from Skero - “Hudeln”
Modifier and Type | Method and Description |
---|---|
void |
call(Subscriber<? super T> s) |
static <T> Observable<T> |
deferAndWatch(Func0<? extends Observable<? extends T>> observableFactory)
Defer a hot observable and clean its buffers if needed on early unsubscribe.
|
public static <T> Observable<T> deferAndWatch(Func0<? extends Observable<? extends T>> observableFactory)
Defer a hot observable and clean its buffers if needed on early unsubscribe. It currently only works if you are deferring a AsyncSubject
.
observableFactory
- the factory of the hot observable.public void call(Subscriber<? super T> s)
call
in interface Action1<Subscriber<? super T>>
Copyright © 2015 Couchbase, Inc.