Asynchronous Progamming Using the Java SDK with Couchbase Server

The Couchbase Java SDK has a complete asynchronous API based in part on RxJava. This section provides information on how the asynchronous API can be used, how it works with Java platform and RxJava features and the kinds of error handling you will need to consider in application development.

Java 8, Lambdas and Anonymous Classes

Before jumping into the details, one thing warrants clarification: RxJava and, therefore, the Java SDK, fully support Java 8. This support brings some great improvements, most prominently support for lambdas and method references.

Because the Java SDK has support for Java 6 and 7, most of the examples shown in the documentation use anonymous classes instead of lambdas. You are free and even encouraged to use them if you can, but Java 8 may not be an option for some.

To whet your appetite for why you may want to strongly consider these features, compare the same Java 6 code to Java 8:

// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.get(id);
        }
    }).forEach(new Action1<JsonDocument>() {
        @Override
        public void call(JsonDocument document) {
            System.out.println("Got: " + document);
        }
    });

Versus:

// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(bucket::get)
    .forEach(document -> System.out.println("Got: " + document));

RxJava has additional support for other languages such as Scala, Groovy, Kotlin or Clojure through language adapters. If you are using one of those, refer to the RxJava documentation on how to use the adapters.

Understanding Observables

You can think of an Observable as the push-based, asynchronous cousin ("dual") of the pull-based, synchronous Iterable. The contract of an Observable is that zero to N data events can happen, followed by a complete event. An error event can also happen at any time and complete the Observable.

Table 1. The duality of Iterable and Observable
Event Iterable (Pull) Observable (Push)

retrieve data

T next()

onNext(T)

discover error

throws Exception

onError(Exception)

complete

returns

onCompleted()

You can convert an Observable into a BlockingObservable, which then behaves very much like the Iterable.

The key element to take away is that an Observable<T> can emit 0 to N events, which is very different than a Future<T> that only contains one value. After you start to work on streams instead of single values, you will very much appreciate this fact.

By definition, an Observable does not imply that the underlying code is executed asynchronously. As a consumer of an Observable, you leave the actual implementation to the supplier, who can change it later on without you having to adapt your code. Imagine, you are consuming this API:

public interface FooService {
    Observable<String> load();
}

It could be that when load() is called, the String value is fetched right out of a Map in memory (or even a hard-coded value). In this case, there is no need to move the execution away from the caller thread, because the value will be returned instantaneously. If later the implementation needs to be changed so that the String is loaded through a web service (introducing latency and other semantics), the API doesn’t need to be changed because the underlying implementation is free to move it to a Scheduler.

Consuming Observables

The first thing you want to do when working with Observables is to consume them. Consuming an Observable means subscribing to it. Here is an example that subscribes and prints out all the items emitted:

Observable
    .just(1, 2, 3)
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed Observable.");
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Whoops: " + throwable.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("Got: " + integer);
        }
    });

The example prints the following lines:

Got: 1
Got: 2
Got: 3
Completed Observable.

You can see that our Subscriber gets notified of every event and also receives the completed event.

A well-formed Observable invokes its subscriber’s onNext method zero or more times and then invokes either the onCompleted or onError method exactly once.

You can also test the error case by throwing an artificial exception when the value 2 is emitted:

Observable
    .just(1, 2, 3)
    .doOnNext(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            if (integer.equals(2)) {
                throw new RuntimeException("I don't like 2");
            }
        }
    })
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed Observable.");
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Whoops: " + throwable.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("Got: " + integer);
        }
    });

The example prints:

Got: 1
Whoops: I don't like 2

The first value gets through without problems, the second value throws an exception and, therefore, terminates the Observable. No subsequent values are allowed to be emitted after an error event.

The subscribe method also returns a Subscription that you can use to unsubscribe and not receive further events.

Even if you don’t unsubscribe explicitly, operations like take do that for you implicitly. The following code only takes the first five values and then unsubscribes:

Observable
    .just("The", "Dave", "Brubeck", "Quartet", "Time", "Out")
    .take(5)
    .subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed Observable.");
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Whoops: " + throwable.getMessage());
        }

        @Override
        public void onNext(String name) {
            System.out.println("Got: " + name);
        }
    });

This prints:

Got: The
Got: Dave
Got: Brubeck
Got: Quartet
Got: Time
Completed Observable.

You do not need to implement the full subscriber every time. If you are only interested in the data events, you can subscribe like this:

Observable
    .just(1, 2, 3)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println("Got: " + integer);
        }
    });

Be aware though that if an error happens, the following exception will be propagated:

Exception in thread "main" rx.exceptions.OnErrorNotImplementedException
	at rx.Observable$36.onError(Observable.java:8412)
	at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:128)
	at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:97)
	at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:67)
	at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:78)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer
          .request(OnSubscribeFromIterable.java:76)
	...

It is recommended always to implement an error handler right from the beginning since things can and will go wrong at some point. It can come in handy though if you just want to try things out quickly or for illustrative purposes.

From Asynchronous to Synchronous

As long as your Observable works on the same thread all the time, there is no need for communication between threads since only one is executing. When your Observable flow gets executed on a different thread, you need to take some extra care to make sure you are not missing values. This is not specific to Observables: every time you need to deal with parallel threads you need to think about synchronization and communication.

Most of the snippets in this documentation only call subscribe. You should ensure that your program doesn’t terminate before onCompleted() is called (e.g., via toBlocking() or a CountDownLatch) and be aware of that when trying to replicate a snippet in its Main class.
You should never perform long-running blocking operations inside of an asynchronous stream (e.g. inside of maps or flatMaps).

The following code emits an increasing value every second, which is done on a different thread:

public static void main(String... args) {
    Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long counter) {
                System.out.println("Got: " + counter);
            }
        });
}

It works perfectly fine; the only problem is though chances are you won’t see anything printed out. Your main thread exits before the background thread had a chance to run and emit values.

A common way to deal with such a situation is to add a CountDownLatch, which allows you to synchronize between different threads. One thread counts down the latch; the other one waits until it is counted down:

final CountDownLatch latch = new CountDownLatch(5);
Observable
    .interval(1, TimeUnit.SECONDS)
    .subscribe(new Action1<Long>() {
        @Override
        public void call(Long counter) {
            latch.countDown();
            System.out.println("Got: " + counter);
        }
    });

latch.await();

The example prints the following lines and then exits:

Got: 0
Got: 1
Got: 2
Got: 3
Got: 4

One common mistake is to use Thread.sleep() instead of a latch to synchronize the execution between threads. This is a bad idea because it synchronizes nothing and just keeps one thread alive for a specific amount of time. If the actual calls take less time you are wasting time, and if it takes longer you won’t get the desired effect. If you do this in unit tests, be prepared for a good amount of non-determinism and randomly failing tests.

Always use a CountDownLatch!

A technique unique to Observables is to convert it into a BlockingObservable to achieve the same effect. In simple terms, it converts an Observable into an iterable and makes it execute on the caller thread, blocking it until one or more values arrive. This technique is used extensively in the documentation to show concepts, while not having to deal with count-down latches all the time. It can also be used if you for some reason are not able to use asynchronous computations.

The conversion itself doesn’t do any blocking in the first place, only subsequent calls will:

// This does not block.
BlockingObservable<Long> observable = Observable
    .interval(1, TimeUnit.SECONDS)
    .toBlocking();

// This blocks and is called for every emitted item.
observable.forEach(new Action1<Long>() {
    @Override
    public void call(Long counter) {
        System.out.println("Got: " + counter);
    }
});

Because this will run forever, you are free to chain any asynchronous computations before. Thus, you can build an asynchronous workflow and then block at the very end. This resembles the same code as with the CountDownLatch before:

Observable
    .interval(1, TimeUnit.SECONDS)
    .take(5)
    .toBlocking()
    .forEach(new Action1<Long>() {
        @Override
        public void call(Long counter) {
            System.out.println("Got: " + counter);
        }
    });

If you know that only a single value is ever returned, you can use the single() method:

int value = Observable
    .just(1)
    .toBlocking()
    .single();

Be aware though that if more items get emitted, you get an exception:

Exception in thread "main" java.lang.IllegalArgumentException: Sequence contains too many elements
	at rx.internal.operators.OperatorSingle$1.onNext(OperatorSingle.java:58)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:76)
	at rx.Subscriber.setProducer(Subscriber.java:148)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	....

The same thing happens if no value gets emitted:

Exception in thread "main" java.util.NoSuchElementException: Sequence contains no elements
	at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
	at rx.Subscriber.setProducer(Subscriber.java:148)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	....

As an alternative, you can use singleOrDefault() so that a fallback value gets returned.

You can use this technique with the Java SDK if you are loading a document and it does not exist:

JsonDocument doc = bucket.get("id").toBlocking().singleOrDefault(null);
if (doc == null) {
    System.err.println("Document not found!");
} else {
    System.out.println(doc);
}

If you check out the API documentation of the BlockingObservable, you will discover many more possibilities, including iterators or grabbing the first and last values.

One last thing that comes in handy with blocking calls: sometimes you want to collect all emitted values into a list. You can combine the blocking calls with the toList() operator to achieve something like this:

List<Integer> list = Observable
    .just(1, 2, 3)
    .toList()
    .toBlocking()
    .single();

// Prints: [1, 2, 3]
System.out.println(list);

Creating Observables

There are many ways to create Observables, and you’ve already seen just() and interval(). There are much more such convenience methods available on the Observable class, but they all boil down to the create() method. You can simulate the example from before with this:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        try {
            if (!subscriber.isUnsubscribed()) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        } catch (Exception ex) {
            subscriber.onError(ex);
        }
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        System.out.println("Got: " + integer);
    }
});

Every time a Subscriber subscribes, the call() method is executed. You can then call onNext, onComplete and onError as you wish, but keep in mind that both onComplete and onError should only be called once, and afterward no subsequent onNext is allowed to follow so that the contract is met.

This example shows why it is crucial to call subscribe() on the Observable, because only such a call triggers the actual execution of the pipeline. This is a little different with subjects, which are covered later in this guide. Nevertheless, always call subscribe() on your observables.

Transforming Observables

Observables can transform their values in various ways. One of the most basic ones is map(), which converts the incoming value into a different one. You surely like division, so here is the FizzBuzz game:

Observable
    .interval(10, TimeUnit.MILLISECONDS)
    .take(20)
    .map(new Func1<Long, String>() {
        @Override
        public String call(Long input) {
            if (input % 3 == 0) {
                return "Fizz";
            } else if (input % 5 == 0) {
                return "Buzz";
            }
            return Long.toString(input);
        }
    })
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

The map function is used to convert the input number into a string and do some checks to satisfy the FizzBuzz game. As a more practical example, consider loading a document from the Java SDK and only extracting the first name of a user before passing it on:

bucket
    .get("id")
    .map(new Func1<JsonDocument, String>() {
        @Override
        public String call(JsonDocument document) {
            return document.content().getString("firstname");
        }
    }).subscribe();

A variation of map() is called flatMap(), which allows you to do those transformations with asynchronous calls. Taking the example from above, we want to map from String (the document ID) to a JsonDocument (the loaded document). With a normal map(), call you would either need to block on the Observable or at some point deal with an Observable<Observable<JsonDocument>>.

Thankfully, flatMap() flattens the resulting values for us and return them into the original flow:

// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.get(id);
        }
    }).subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(JsonDocument document) {
            System.out.println("Got: " + document);
        }
    });

You can see that flatMap() returns an Observable<T> whereas the normal map just returns <T>. You will use flatMap() a lot when dealing with flows like this, so keep it in mind.

Another helpful transformation is scan(). It applies a function to each value emitted by an Observable, sequentially, and emits each successive value. We can use it to aggregate values like this:

Observable
    .just(1, 2, 3, 4, 5)
    .scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer sum, Integer value) {
            return sum + value;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println("Sum: " + integer);
        }
    });

This prints:

Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: 15

Finally, groupBy() comes in handy, which emits one Observable by each group, defined by a function. The following example emits two Observables, one for even and one for odd values:

Observable
    .just(1, 2, 3, 4, 5)
    .groupBy(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer % 2 == 0;
        }
    }).subscribe(new Action1<GroupedObservable<Boolean, Integer>>() {
        @Override
        public void call(GroupedObservable<Boolean, Integer> grouped) {
            grouped.toList().subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    System.out.println(integers + " (Even: " + grouped.getKey() + ")");
                }
            });
        }
    });

The example prints:

[1, 3, 5] (Even: false)
[2, 4] (Even: true)

Combined with the Java SDK, this technique can be used to separate returned Documents based on their content. The following example uses a view to load all documents from the beer-sample bucket, groups them by type and counts the number of occurrences:

bucket
    .async()
    .query(ViewQuery.from("my_design_doc", "my_view"))
    .flatMap(AsyncViewResult::rows)
    .flatMap(AsyncViewRow::document)
    .groupBy(document -> document.content().getString("type"))
    .subscribe(observable ->
        observable.count().subscribe(integer ->
            System.out.println(observable.getKey() + ": " + integer)
        )
    );

This code queries the view, extracts all rows, loads the full document for each row, groups it by the type property in the JSON document and then uses the count() operator to count the number of rows emitted by each Observable. This prints something like the following:

brewery: 1412
beer: 5891

Filtering Observables

In addition to transforming observables, you can also filter them. Filtering doesn’t change the emitted values itself, but rather how much and at which point (and if at all) they are emitted.

For example, you can filter based on some criteria:

// This will only let 3 and 4 pass.
Observable
    .just(1, 2, 3, 4)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer > 2;
        }
    }).subscribe();

Or take only the first N values emitted and then unsubscribe:

// Only 1 and 2 will pass.
Observable
    .just(1, 2, 3, 4)
    .take(2)
    .subscribe();

Or use only the first or last value emitted:

// Only 1 will pass
Observable
    .just(1, 2, 3, 4)
    .first()
    .subscribe();
// Only 4 will pass
Observable
    .just(1, 2, 3, 4)
    .last()
    .subscribe();

Finally, you can use distinct() to suppress duplicate values:

// 1, 2, 3, 4 will be emitted
Observable
    .just(1, 2, 1, 3, 4, 2)
    .distinct()
    .subscribe();
distinct() also allows you to pass in a function that returns the key to select by. You can use this, for example, to separate out duplicate JsonDocument objects.

Combining Observables

Multiple Observables can also be merged to form a combined one. Depending on how you want those to be merged, there are different operators available. Two of the most used ones are merge() and zip() which are covered here.

Merge only merges all emitted values by the source Observables in the order they arrive:

Observable
    .merge(evens, odds)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println(integer);
        }
    });

The example prints something similar to the following:

2
4
6
8
10
1
3
5
7
9

With the zip operator, you can combine two streams in the strictly same order, defined by a function:

Observable<Integer> evens = Observable.just(2, 4, 6, 8, 10);
Observable<Integer> odds = Observable.just(1, 3, 5, 7, 9);

Observable
    .zip(evens, odds, (v1, v2) -> v1 + " + " + v2 + " is: " + (v1 + v2))
    .subscribe(System.out::println);

This zips the pairs together in order and prints:

2 + 1 is: 3
4 + 3 is: 7
6 + 5 is: 11
8 + 7 is: 15
10 + 9 is: 19

Error Handling

Error handling is a vital component of every real world application and needs to be considered from the start. RxJava provides sophisticated mechanisms to deal with errors that happen inevitably in your Observable flows.

In general, you want to react in the following ways:

  • Return a default value instead.

  • Flip over to a backup Observable.

  • Retry the Observable (immediately or with backoff).

Returning a default value is a good idea if you cannot afford to retry or you just don’t care (maybe because the flow is not crucial to your data flow). The following code throws an exception at the first emitted item, but falls back to a default value:

Note that you can pass in a function that also takes the exception, so you can return different values for different exception types or use it for logging purposes.

// Prints:
// Default
// Oops: I don't like: Apples
Observable
    .just("Apples", "Bananas")
    .doOnNext(s -> {
        throw new RuntimeException("I don't like: " + s);
    })
    .onErrorReturn(throwable -> {
        System.err.println("Oops: " + throwable.getMessage());
        return "Default";
    }).subscribe(System.out::println);

You can also flip to a backup Observable that will be called if the first one fails. The Java SDK has a getFromReplica() command that allows you to read stale data from its replicas and treat availability for consistency on reads. You can use this approach to fall back:

bucket
    .get("id")
    .onErrorResumeNext(bucket.getFromReplica("id", ReplicaMode.ALL))
    .subscribe();

Normally you want to have more control on which Observable should be run next depending on the type of error. The following example will only go to the replica if a TimeoutException happened (if not the error is passed down):

bucket
    .get("id")
    .timeout(500, TimeUnit.MILLISECONDS)
    .onErrorResumeNext(new Func1<Throwable, Observable<? extends JsonDocument>>() {
        @Override
        public Observable<? extends JsonDocument> call(Throwable throwable) {
            if (throwable instanceof TimeoutException) {
                return bucket.getFromReplica("id", ReplicaMode.ALL);
            }
            return Observable.error(throwable);
        }
    });

Finally, it is possible to retry the Observable by resubscribing. This can be done as quickly as possible, or with a backoff interval, which is preferred when external resources are involved.

The following program desperately tries to read the numbers from 1 to 10, but a (not so hidden) flaw makes it randomly throw an exception. If that happens, the code retries. Since lots of values might be already emitted, we can use distinct() to filter those out.

Observable
    .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    .doOnNext(integer -> {
        if (new Random().nextInt(10) + 1 == 5) {
            throw new RuntimeException("Boo!");
        }
    })
    .retry()
    .distinct()
    .subscribe(System.out::println);
If you only want to retry for a max amount, replace the retry() with a retry(count) call.

If you want to retry with backoff, you can use a technique like this:

Observable
    .range(1, 10)
    .doOnNext(integer -> {
        if (new Random().nextInt(10) + 1 == 5) {
            throw new RuntimeException("Boo!");
        }
    })
    .retryWhen(attempts ->
        attempts.zipWith(Observable.range(1, 3), (n, i) -> i)
        .flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
        }))
    .distinct()
    .subscribe(System.out::println);

The attempts get passed into the retryWhen() method and zipped with the number of seconds to wait. The timer method is used to complete once its timer is done. If you run this code a few times to generate an exception (or more), you will see something similar to this:

1
2
3
4
delay retry by 1 second(s)
delay retry by 2 second(s)
5
6
7
8
9
10

With the Java SDK the advanced retryWhen is easier to write using the RetryBuilder helper class. These can be used to specify a filter on which kind of Exception to retry, for how many attempts, with any Delay.

Observable
    .range(1, 10)
    .doOnNext(integer -> {
         if (new Random().nextInt(10) + 1 == 5) {
             throw new RuntimeException("Boo!");
         }
    })
    .retryWhen(
         RetryBuilder
             .anyOf(RuntimeException.class)
             .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
             .max(5)
             .build())
    .distinct()
    .subscribe(System.out::println);

Schedulers and Threads

Schedulers in RxJava are used to manage and control concurrency. Some operators implicitly use one or allow you to pass in a custom one.

RxJava ships with a bunch of pre-configured Schedulers by default, which are all accessible through the Schedulers class:

  • Schedulers.computation(): Event-loop style scheduler for purely computational work.

  • Schedulers.immediate(): Executes the work immediately on the current thread.

  • Schedulers.io(): Executes work on an Executor-backed pool that grows as needed.

  • Schedulers.newThread(): Creates a new thread for each unit of work.

  • Schedulers.trampoline(): Queues the work on the current thread and gets executed after the current work completes.

  • Schedulers.test(): Test the scheduler used for testing and debugging, which allows manual advancing of the clock.

As a rule of thumb, the computation scheduler should always be used for in-memory processing, while the I/O scheduler should only be used for blocking-style I/O operations (so do not use it together with the Java SDK because it is asynchronous anyway).

You can instruct an observable to be executed on such a scheduler in the following ways:

  • Implicitly by using an operator that makes use of one

  • Explicitly by passing the Scheduler to such an operator

  • By using subscribeOn(Scheduler)

  • By using observeOn(Scheduler)

Operators like buffer, replay, skip, delay, parallel and so on use a scheduler by default if not instructed otherwise. A list of default schedulers can be found here.

As a rule of thumb, all of those operators allow you to pass in a custom scheduler if needed, but most of the time sticking with the defaults is a good idea.

The Java SDK uses an internal scheduler similar to the computation scheduler to proper isolate the inner mechanisms from user-land. It is possible to change that scheduler through the environment, but it is not recommended.

If you want the whole subscribe chain to be executed on a specific scheduler, you use the subscribeOn() operator. Without a scheduler set, the following code executes on the main thread:

Observable
    .range(1, 5)
    .map(integer -> {
        System.out.println("Map: (" + Thread.currentThread().getName() + ")");
        return integer + 2;
    })
    .subscribe(integer ->
        System.out.println("Got: " + integer + " (" + Thread.currentThread().getName() + ")")
    );

The example prints:

Map: (main)
Got: 3 (main)
Map: (main)
Got: 4 (main)
Map: (main)
Got: 5 (main)
Map: (main)
Got: 6 (main)
Map: (main)
Got: 7 (main)

This example shows the subscribeOn() method added to the flow (it doesn’t matter where you add it):

Observable
    .range(1, 5)
    .map(integer -> {
        System.out.println("Map: (" + Thread.currentThread().getName() + ")");
        return integer + 2;
    })
    .subscribeOn(Schedulers.computation())
    .subscribe(integer ->
            System.out.println("Got: " + integer + " (" + Thread.currentThread().getName() + ")")
    );

In the output of the example that uses subscribeOn(), you can see it is executed on the same thread, but on the computation thread pool:

Map: (RxComputationThreadPool-6)
Got: 3 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 4 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 5 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 6 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 7 (RxComputationThreadPool-6)

If you need tighter control regarding which parts are executed on what pool, use observeOn(). Here, the order matters:

Observable
    .range(1, 5)
    .map(integer -> {
        System.out.println("Map: (" + Thread.currentThread().getName() + ")");
        return integer + 2;
    })
    .observeOn(Schedulers.computation())
    .subscribe(integer ->
            System.out.println("Got: " + integer + " (" + Thread.currentThread().getName() + ")")
    );

Everything before the observeOn() call is executed in main, everything below in the scheduler:

Map: (main)
Map: (main)
Map: (main)
Got: 3 (RxComputationThreadPool-6)
Got: 4 (RxComputationThreadPool-6)
Got: 5 (RxComputationThreadPool-6)
Map: (main)
Map: (main)
Got: 6 (RxComputationThreadPool-6)
Got: 7 (RxComputationThreadPool-6)

There is also a way to use schedulers directly to schedule operations. For more information about schedulers, refer to the RxJava documentation about schedulers.

Writing Resilient Reactive Applications

Many concepts can be applied for both synchronous and asynchronous access. When necessary, both patterns are discussed separately. The focus is on database interaction, but if you are using RxJava as part of your stack, you can and should apply most of the principles there as well.

Error Recovery

This section discusses different strategies to mitigate errors that might come up during operations. Some of them are shown to make a point, but the techniques apply to all different types of errors and can be applied as you see fit.

Defaults

Another possibility that requires intervention on the application side are Observables that do not emit a single value. This can happen because operators filtered the Observable so that nothing is left, or they did not produce any values in the first place. One common case in the Java SDK is get(). If the Document is not found, the Observable will complete without emitting anything.

RxJava provides helper operators that all end with *OrDefault() and allow you to return default values if no item is emitted when the Observable completes.

In most cases, you want to use singleOrDefault() and return a default value when not a single item is emitted by the source Observable:

Observable
    .<String>empty()
    .singleOrDefault("Default")
    .subscribe();

If you are dealing with potentially more than one item emitted in your Observable and you only want to emit either the first or the last value, there are also operators that allow you to emit a default if it’s unexpectedly empty. See firstOrDefault() as well as lastOrDefault() for more details.

Lastly, RxJava offers the method defaultIfEmtpy, which allows you to return a value if the source doesn’t emit anything. In recent versions, the method switchIfEmpty allows you to switch to a different Observable in that same case.

Error Handling in Bulk Scenarios

Bulk operations are used to handle more data in one batch and, therefore, benefit from better resource utilization. However, error handling becomes more complicated. There are three high-level cases to consider:

  • Best Effort: Keep going and just use the results that succeeded when errors happen.

  • Full Retry: Retry the complete Observable when errors happen.

  • Incremental Retry: Only retry specific events of the Observable.

Before we "dig" into the specific approaches, let’s revisit the contract of Observables:

onNext* (onError | onComplete)

Zero or more events are followed by either an error event or a complete event. This provides an important clue right away: once our Observable fails, no more events can be passed through. Therefore, you need to make sure that errors are handled at the smallest scope possible, only letting it proceed if you cannot handle it right away.

Best Effort Bulk Handling

Sometimes it is more important to get data in a timeframe, or maybe with a short timeout, then getting all data. In general, you want to keep a timeout that lets you fetch all the data, but depending on the use case you are fine with only getting a subset of the data returned.

To ignore errors and turn them into "noops", you can utilize onErrorResumeNext():

Observable
      .create(new Observable.OnSubscribe<String>() {
           @Override
           public void call(Subscriber<? super String> subscriber) {
               subscriber.onNext("A");
               subscriber.onNext("B");
               subscriber.onError(new IllegalStateException("Woops"));
               subscriber.onNext("C");
            }
        })
    .onErrorResumeNext(Observable.<String>empty())
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("Got: " + s);
        }
    });

This will not raise any exception to your calling thread, but it will never process "C" because, based on the Observable contract, once onError is called no more events are allowed to be generated.

To keep going in case an event fails, you need to turn each event into a single Observable and then merge it back together to either defer the error handling (through Observable#mergeDelayError) or use flatMap and make sure to contain the errors before flattening.

This code provides some fake data with which you can work. It will emit four Observables where one of them will fail:

Observable<Observable<String>> dataObservables = Observable
    .just("a", "b", "c", "d")
    .map(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            if (s.equals("b")) {
                return Observable.error(new IllegalStateException("I dont like b"));
            }
            return Observable.just(s);
        }
    });

You can then use mergeDelayError to defer error handling until the end. If you un-comment the onErrorResumeNext, it will silently discard any errors as well leaving you with an Observable that provides best-effort processing of the source Observables.

Observable.mergeDelayError(dataObservables)
    //.onErrorResumeNext(Observable.<String>empty())
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

Alternatively, you can use flatMap and make sure the errors are contained for each emitted Observable:

Observable
    .just("a", "b", "c", "d")
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            // Simulate some observable that sometimes fails
            Observable<String> obs;
            if (s.equals("b")) {
                obs = Observable.error(new IllegalStateException("I dont like b"));
            } else {
                obs = Observable.just(s);
            }

            // Here comes the actual handling part before we flatten it back
            return obs.onErrorResumeNext(Observable.<String>empty());
        }
    })
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

As a more practical example here is a best effort bulk loading of documents from Couchbase Server that just discards operations that fail:

 private static Observable<JsonDocument> loadDocsBestEffort(Bucket bucket, List<String> ids) {
    return Observable
        .from(ids)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String id) {
                return bucket
                .async()
                .get(id)
                .onErrorResumeNext(Observable.<JsonDocument>empty());
            }
        });
}

Of course, you can add more logic onto each emitted Observable and also proper logging so that you at least know which errors occurred.

Full Retry Bulk Handling

Full retry bulk handling can be done by just chaining one of the retry operators right at the end of the Observable chain.

This example uses Java 8 Lambda syntax for conciseness.
 private static Observable<JsonDocument> loadDocsFullRetry(Bucket bucket, List<String> ids) {
    return Observable
        .from(ids)
        .flatMap(id -> return bucket.async().get(id))
        .retry(4);
}

This is, however, a bit crude because the whole list of IDs will be retried if there is an error. This is the case even if the error happens late in the stream and some items have already been emitted. Imagine that you retrieve items A, B, C, D and that an error occurs just the first time the SDK retrieves C. What you will see is A, B, A, B, C, D.

This can be mitigated by using the distinct() operator, but keep in mind the memory tradeoff (distinct will maintain a set of values it saw in the background, which can be problematic for infinite or very large streams).

Incremental Retry Bulk Handling

Incremental retry improves on full retry by only actually retrying elements that couldn’t be loaded. It is easily done by chaining the retry operators inside the flatMap instead of outside of it.

 private static Observable<JsonDocument> loadDocsIncrementalRetry(Bucket bucket, List<String> ids) {
    return Observable
        .from(ids)
        .flatMap(id -> return bucket.async().get(id)
            .retry(4)
        );
}

One thing to keep in mind with flatMap is that the order of emissions isn’t guaranteed to be the same as the source emissions. This can be especially visible in this case since individual streams are retried on their own: retried keys could come back with a value much later than other keys, more often changing the order in which they appear.

Bulk Pattern, BackpressureException and Reactive Pull Backpressure

The bulk pattern is powerful, but if you try it with a very large collection, it is especially susceptible to a BackpressureException. This exception is a way for the SDK to tell you that you’re asking for too much in a short amount of time, and requests have piled up too much while waiting for a server response to free up a spot.

By default, the internal structure that holds pending requests is sized to a little over 16 000 slots. Since from iterates over a collection in memory, which is very fast, it can easily flood the SDK if the size of it is over or close to 16K.

You could try to react to that by implementing a retry strategy, but it is hard to get it right. Fortunately, RxJava comes with an even better solution, a form of flow-control called reactive pull backpressure that you can leverage to avoid getting a BackpressureException.

This is done through the Subscriber, its onStart() method and the request(n) method.

You have to use the Subscriber to subscribe() instead of the convenience methods that just take Action individually for next, error and completed handlers.
Observable<JsonDocument> largeBulk = Observable
    .from(oneMillionIds)
    .flatMap(id -> return bucket.async().get(id));

//the subscription is where reactive pull backpressure happens
largeBulk.subscribe(new Subscriber<JsonDocument>() {

    //onStart initiates the backpressure
    @Override
    public void onStart() {
        request(100);
    }

    @Override
    public void onNext(JsonDocument jsonDocument) {
        System.out.println(jsonDocument.content());
        request(100); //request more items
    }

    //onCompleted and onError are less relevant.
    //Note that if you want to block you'd need a CountDownLatch in this case

    @Override public void onCompleted() { System.out.println("Done"); }

    @Override public void onError(Throwable e) { e.printStackTrace(); }

});

The request(n) method allows to notify the upstream source that the consumer is ready for more, and is prepared to consume n items. For example, this avoids instant iteration of the collection in the case of from. And as long as responses come in from Couchbase Server, we continue asking for more.

This is a form of error mitigation, and we’ll see a few others in the next section.

Error Mitigation

The idea behind error mitigation is to limit the impact a failing piece had on the rest of the system. When an error occurs, you don’t want it to cause failures downstream. You also want to prevent as much of the error conditions to happen as possible.

The BackpressureException and RxJava’s reactive pull backpressure are forms of error mitigation we already saw since they both allow a form of flow control that avoids overloading the system.

Slowing Down

Rate limiting of the requests can be a way to keep the system within parameters that are known to be stable, thus mitigating overloading errors. This can happen in the client-to-server direction, which avoids making more requests than can be processed later. It can also happen in the server-to-client direction, when the client can’t keep up with the bandwidth at which data is sent from the server because, for instance, it has to process it through long-running tasks. Rather than choking on that data when backpressure mechanisms are not available, the client can decide to discard some of the data. This is referred to as Load Shedding.

For example, consider a system where multiple sensors write data into Couchbase, and the client application reads that data and displays it in a UI. The graphs in the UI can be built on a sample of the data, and that’s the key point. The processing rate of the data is lower than its producing rate, but at the same time the resolution is also lower. That means we can simply ignore some of the data and only consume what’s needed, such as take the data point at every second to trigger a refresh of the graph in the UI.

RxJava allows you to do that through several operators:

  • sample (alternatively throttleLast) periodically looks at an Observable and emits the item last seen during this period.

  • throttleFirst does the same except it emits the first item seen during the period.

  • debounce only emits an item if a given timespan has passed since the last emission, eliminating items that are rapidly following another item.

The operator sample is useful for sources that are "too chatty" even at a regular rate. The operator debounce, on the other hand, is really shining with sources that have a variable rate of emission. The sources only trigger a web search for auto-complete suggestions from an input form once the user has stopped typing for 500ms.

Semaphore

To limit the strain put on dependencies of your application, one possibility is to put hard limits on how many concurrent threads can access a resource. This is done with the classic concurrency programming pattern of the Semaphore, which is implemented in java.util.concurrent since Java 7.

Note that Semaphores are a bit different from locks since they don’t have a notion of ownership. For example, two different threads can respectively acquire() and release() a Semaphore permit), which can avoid things like deadlocks.

The same concept could also be implemented using a fixed-size thread pool. Such thread pool adds overhead but allows upstream code to just "walk away" from the call if it takes too long for a slot to become available, providing an even better isolation.

Collapsing Requests

Another way of mitigating errors is to batch similar requests together and "collapse" them into one request to the subsystem. If you have multiple threads that potentially ask for the same data, the benefit can be immediate by just putting a facade on your calls that will de-duplicate parallel requests.

You can go beyond and trade a little bit of latency to allow for more potential de-duplication. Wait a little longer to see if any duplicate request comes in before firing the actual de-duplicated request.

Implementation

All the mentioned techniques can be easily implemented in an application in a RxJava-compatible way by using another library from Netflix, Hystrix. It is a very comprehensive library for building resilient applications, including techniques for error mitigation but also failing fast. Here, the circuit-breaker pattern short-circuits calls to a service that is known to be down until we detect it is back up, which stops cascading failures, provides fallbacks and gathers metrics.