Class OnSubscribeDeferAndWatch<T>

  • All Implemented Interfaces:
    rx.functions.Action, rx.functions.Action1<rx.Subscriber<? super T>>, rx.functions.Function, rx.Observable.OnSubscribe<T>

    public class OnSubscribeDeferAndWatch<T>
    extends Object
    implements rx.Observable.OnSubscribe<T>
    Defers the execution of a Subject and in addition watches for early unsubscription and cleans up buffers if the content is ReferenceCounted. Implementation Details: This implementation is very similar to Observable.defer(Func0) in that it takes a hot observable like a subject and defers the execution of it until someone subscribes. The problem with vanilla defer is that if an early unsubscribe happens (like a downstream timeout firing) the message from the hot observable is not properly consumed anymore which can lead to buffer leaks if it contains a pooled resource. To mitigate this, another subscription is added to the hot observable which checks, at the time of item emission, that the subscription is still present. If it is not the buffers are proactively cleared out, making sure that no trace/leak is left behind. ♬ Wir hom so vü zum tuan ♬ We have so much to do ♬ Wir hudln und schurdln ummanond ♬ We are hurrying and botching around ♬ Müssen uns sputen des dauert vü zu lang ♬ We need to hurry, it all takes too long ♬ Da hüft ka hupen so kummst a ned schneller dran ♬ No need to honk, you’re not getting served quicker ♬ Und a ka Fluchen lametier ned gemmas on ♬ No swearing, no whining, lets get on with it -- from Skero - "Hudeln"
    Michael Nitschinger
    • Method Detail

      • deferAndWatch

        public static <T> rx.Observable<T> deferAndWatch​(rx.functions.Func1<rx.Subscriber,​? extends rx.Observable<? extends T>> observableFactory)
        Defer a hot observable and clean its buffers if needed on early unsubscribe. It currently only works if you are deferring a AsyncSubject.
        observableFactory - the factory of the hot observable.
        a deferred observable which handles cleanup of resources on early unsubscribe.
      • call

        public void call​(rx.Subscriber<? super T> s)
        Specified by:
        call in interface rx.functions.Action1<T>