A newer version of this documentation is available.

View Latest

Writing resilient reactive applications

Read this section to learn how to write resilient code for production with the Couchbase Java SDK.
Many concepts can be applied for both synchronous and asynchronous access. When necessary, both patterns are discussed separately. The focus is on database interaction, but if you are using RxJava as part of your stack, you can and should apply most of the principles there as well.

RxJava 101 Recap: Cold and Hot Observables

When working with Observables, it is important to understand the difference between cold and hot.

  • Cold Observables will start to emit events once an Observer subscribes, and will do it "fresh" for each Observer.

  • Hot Observables instead are starting to emit data as soon as it becomes available, and will return the same (or parts of the same) to each Observer. There is also a special category of Hot Observables called Subjects in RxJava (these allow to construct Observables that are also Observers, so you can feed data manually by calling onNext).

    The Observables are discussed in more detail in Mastering Observables.

    Up to the version 2.2.0, Couchbase Java SDK uses Hot Observables for each database operation, sending a request to the server as soon as you obtain the Observable. This initial architectural choice allowed to avoid firing two network operations, such as when doing a get and subscribing twice, but it implied harder to grasp behavior when dealing with failures.

    One of the most important things to remember is the following: when you resubscribe to a Hot Observable, it won’t perform the operation against Couchbase Server again. In some cases, your Subscriber will only get notified about new emissions coming. In some cases, such as with cached/replaying Observables for the SDK, the source will emit the same data again on each new subscription, including if it was an error. Emitting the same data is okay in many cases, but not if you want to retry an operation because it has failed. That said, RxJava provides ways to turn Hot Observables into cold and vice versa.

The following example is wrong for the Java SDK versions before 2.2.0 because it will give you the same result over and over again:

// Will just retry on the same result from the first get request \
        bucket.async().get("id").retry(5).subscribe();

The next example is correct, because it will produce a brand new Observable every time you retry (resubscribe).

// Will correctly do a new get operation against the server \
        Observable .defer(new Func0<Observable<JsonDocument>>() { @Override \
        public Observable<JsonDocument> call() { return \
        bucket.async().get("id"); } }) .retry(5) .subscribe();

Use Observable#defer() if you want a new Observable for every subscriber. You can also use this technique if you want to defer the execution of an observable until someone subscribes.

As of Java SDK 2.2.0, all the requests sent by the SDK are using Observable.defer and as such are now cold.

If you want to turn a cold into a hot Observable, take a look at the Observable#cache() or Observable#replay() operators.

Finally, the Observable is not converted from cold to hot just because a hot Observable is flatMapped. Therefore this code works perfectly fine:

   Observable .just("id") .flatMap(new Func1<String, \
        Observable<JsonDocument>>() { @Override public \
        Observable<JsonDocument> call(String id) { return \
        bucket.async().get(id); } }) .retry(5) .subscribe();

Error recovery

This section discusses different strategies to mitigate errors that might come up during operations (covered in Error causes). Some of them are shown to make a point, but the techniques apply to all different types of errors and can be applied as you see fit.

Logging

It is always important to log errors, but even more so in the case of reactive applications. Because of the event driven nature, stack traces get harder to look at, and caller context is sometimes lost.

RxJava provides operators for "side effects" (additional behavior that doesn’t change the items flowing through the Observable stream), which should be used to log errors. Of course, you can also put logging into the error handlers, but readability is increased if the logging is put explicitly as a side effect.

   Observable .error(new Exception("I'm failing")) .doOnError(new \
        Action1<Throwable>() { @Override public void call(Throwable \
        throwable) { // I'm an explicit side effect // use a proper logger of \
        your choice here LOGGER.warn("Error while doing XYZ", throwable); } }) .subscribe();

It is also recommended to configure your logger to include absolute timestamps. While this is always a good idea, if combined with good logging throughout the application it makes it easier to debug error cases and see later what was going on inside your reactive application.

You can also utilize the various other side-effect operators for general logging (doOnNext, doOnCompleted). If you don’t want to have different side effects for the same logging operation, you can use doOnEach. It will be called for both errors and next events with a Notification object that denotes what kind of event is being processed.

Failing

Failing is the easiest way to handle errors - because you don’t. While most of the time you want more sophisticated error handling strategies (as discussed later), sometimes you just need to fail. It makes no sense for some errors to be retried, either because they are not transient or because you already tried everything to make it work and it still keeps failing.

In error-resilient architectures, you want to do everything to keep the error contained. However, if the containment is not able to handle the error it needs to propagate it to a parent component that (possibly) can.

In the async case, errors are events like every other for your subscribers. Once an error happens, your Subscriber is notified in the method onError(Throwable), and you can handle it the way you want to. Note that by Observable contract, after the onError event, no more onNext events will happen.

   Observable
        .error(new Exception("I'm failing"))
        .subscribe(new Subscriber<Object>() {
          @Override
          public void onCompleted() {
          }

          @Override
          public void onError(Throwable e) {
          System.err.println("Got Error: " + e);
          }

          @Override
          public void onNext(Object o) {
          }
          });

It is always a good idea to implement error handling.

In the synchronous case, every error is converted into an Exception and thrown so that you can use regular try/catch semantics.

   try {
        Object data = Observable
        .error(new Exception("I'm failing"))
        .toBlocking()
        .single();
        } catch(Exception ex) {
        System.err.println("Got Exception: " + ex);
        }

If you do not catch the Exception, it will bubble up:

 Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: I'm failing
      at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:482)
      at rx.observables.BlockingObservable.single(BlockingObservable.java:349)

Retry

Retrying operations is a common technique to ride over transient errors. It should not be used for non-transient errors because it will only put a load onto the system without the chance to resolve the error.

In practice, the following retry strategies can be applied when a transient error is discovered:

  • Retry immediately.

  • Retry with a fixed delay.

  • Retry with a linearly increasing delay.

  • Retry with an exponentially increasing delay.

  • Retry with a random delay.

Unless you have a very good reason not to, always apply a maximum number of attempts and then escalate the error. Systems stuck in infinite retry loops can cause issues that are very hard to debug. It’s better to fail and propagate at some point.

Also, we recommend that you use asynchronous retry even if you are blocking at the very end. Retrying in the asynchronous Observables is way more resource efficient and also the only sane way to handle multiple operation steps (and bulk operations) under a single timeout (read more in Timeouts).

Starting with 2.1.0 the Java SDK comes with a RetryBuilder, a utility class to describe retries with a fluent API (see ). In earlier versions use the code provided in the documentation.

Retry without delay

Let’s get one thing straight right away: immediately retrying is almost never a good idea. Instead of resolving the error more quickly, it will put more pressure onto the retried system, and there is a good chance it will make resolving errors harder.

One good reason to do so is if you have a specific operation with a very short timeout that you want to keep retrying for a small, fixed amount of times and if it still does not work, fail fast.

If you have the feeling you need to retry very quickly, you can also apply a very slight increasing delay to, at least, release some pressure from the target system.

RxJava provides the retry operator to resubscribe to the source Observable immediately once it fails (an error event happens). Three flavors are available:

  • retry(): Instantly retry as long as the source Observable emits an error. It is strongly recommend not to use this operator.

  • retry(long count): Instantly retry as long as the source Observable emits an error or the max count is reached. If the count is reached, the Observable will not be resubscribed, but the error is propagated down the stream. This operator is recommended for use.

  • retry(Func2<Integer, Throwable, Boolean> predicate): Instantly retry as long as the predicate returns true. Arguments to the predicate are the number of tries, as well as the exception type.

Since the predicate method provides the most flexibility, it is recommended to use it. If you only want to handle a specific exception and retry a maximum of MAX_TRIES times, you can do it like this:

+

   Observable .error(new CASMismatchException()) .retry(new \
          Func2<Integer, Throwable, Boolean>() { @Override public Boolean \
          call(Integer tries, Throwable throwable) { return (throwable \
          instanceof CASMismatchException) && tries <MAX_TRIES; } }) .subscribe();

+ Try replacing CASMismatchException with something else and you will see that it does not try to retry, but rather propagates the error downstream. You can use this technique to handle specific errors differently by adding more retry operators in the pipeline.

+ Using the retry with predicate also allows you to log the number of retries for a specific error. If you use the doOnError for logging, it’s harder to log the number of retries.

+ The synchronous equivalent to the latest code looks like this:

+

   int tries = 0; while(true) { tries++; try { pretendWorkThatMaybeThrows(); // \
          does some work and maybe throws break; } catch(Throwable throwable) { if (!(throwable \
          instanceof CASMismatchException) || tries >= MAX_TRIES) { throw throwable; // \
          rethrow exceptions } } }
Retry with delay

When applying a Retry with delay, the main question you need to ask yourself is: how often and how long is it feasible to retry before giving up (and escalate the error). Using this retry option depends on the type of operation, use case, and SLA that the application requires, but the techniques are the same.

RxJava provides the retryWhen operator, which allows you more flexibility with the actions performed as well as when the resubscription is happening. This section covers the different delay approaches based on this operator.

Here is the contract for retryWhen that you should always keep in mind:

  • It is called when an error on the source Observable happens.

  • The function provided will be called with an Observable containing this error.

  • If you make this Observable error, it is propagated downstream (without retrying).

  • If you make this Observable complete, it is propagated downstream (without retrying).

  • If you make this Observable call onNext, a retry will happen.

Since the version 2.1.0 the Java SDK comes with the RetryBuilder, a helper to describe when and how to retry: only on certain classes of exceptions, max 5 attempts, the exponential delay between attempts, and so on. The result of this builder (calling build()) can be used with RxJava’s retryWhen operator directly:

+

Observable.error(new IllegalStateException())
          .retryWhen(
          RetryBuilder.anyOf(IllegalStateException.class).max(6).delay(Delay.linear(TimeUnit.SECONDS)).build()
          );

+ This code will ultimately fail after 6 additional attempts. It would fail fast if the source errored with something else than an IllegalStateException during retries. Each attempt will be made with an increasing delay, which grows linearly (1 second, then 2, 3, 4). If an exception occurs that is not managed by the handler, it is propagated as is, allowing you to chain such handlers.

+ If the maximum number of attempts is reached, the last exception that occurred is propagated, wrapped in a CannotRetryException. This helper allows to write retry semantics more easily, but in this section it is explained how to write them from scratch.

+ The easiest approach is the fixed delay. The source Observable will be resubscribed after a specified amount of time and for a fixed maximum number of times.

+ Because the nested logic is a bit harder to understand in the first place, let’s talk through it step by step and then put it together.

+ Our retryWhen function is called every time an error happens on the source Observable. If we wanted to try forever every second, it could look like this:

+

    .retryWhen(new Func1<Observable<? extends Throwable>, \
          Observable<?>>() { @Override public Observable<?> call(Observable<? \
            extends Throwable> errorNotification) { return \
            errorNotification.flatMap(new Func1<Throwable, Observable<?>>() { \
          @Override public Observable<?> call(Throwable throwable) { return \
            Observable.timer(1, TimeUnit.SECONDS); } }); } })

+ We flatMap our notification Observable and utilize the Observable#timer to defer emitting a new event for a second. Since we need to stop at some point, after a given number of tries, we can utilize the Observable#zipWith operator to zip our error stream together with a range where we specify the number of tries we want to allow. Zipping has the nice side-effect that once one of the Observable is completed, the resulting Observable will also be complete, which triggers our Rule 4 from above.

+ The modified version looks like this:

+

   .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> errorNotification) {
        return errorNotification
            .zipWith(Observable.range(1, 4), new Func2<Throwable, Integer, Integer>() {
                @Override
                public Integer call(Throwable throwable, Integer attempts) {
                    return attempts;
                }
            })
            .flatMap(new Func1<Integer, Observable<?>>() {
                @Override
                public Observable<?> call(Integer attempts) {
                    return Observable.timer(1, TimeUnit.SECONDS);
                }
            });
    }
})

+ Technically, we don’t need the zip function here because we ignore it later on, but it is required for the zipWith operator to work. We use the Observable#range operator to create an Observable that emits three events and then completes, so we will never end up with more retries.

+ There is one more enhancement needed: the code as it stands there will swallow the originating exception when moving on, which is not good because it should be propagated if it can’t be handled in this code block.

+ The following code is modified so that the function of zipWith returns not only the attempted count but also the throwable, so that Couchbase Server has access to it in the flatMap method. For this, the Java client has a generic Tuple the server can utilize. In the flatMap, Couchbase Server checks for the number of attempts, and if it is over the threshold, it rethrows the exception. Keep in mind that you need to change Observable#range call to MAX_ATTEMPTS+1, to give your code a chance to be called again one final time.

+

   .retryWhen(new Func1<Observable<? extends Throwable>, \
          Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> \
          errorNotification) { return errorNotification .zipWith(Observable.range(1, 5), \
          new Func2<Throwable, Integer, Tuple2<Throwable, Integer>>() { @Override \
          public Tuple2<Throwable, Integer> call(Throwable throwable, Integer attempts) { \
          return Tuple.create(throwable, attempts); } }) .flatMap(new Func1<Tuple2<Throwable, \
          Integer>, Observable<?>>() { @Override public Observable<?> \
          call(Tuple2<Throwable, Integer> attempt) { if (attempt.value2() == 3) \
          { return Observable.error(attempt.value1()); } return Observable.timer(1, TimeUnit.SECONDS); } }); } })

+ If you want to enhance it even further, you can add one more if()clause in the flatMap to see if the throwable that is passed down is actually the one we want to retry.

+ Functionality like this is a great candidate to be generic and encapsulated, so that’s what we did with RetryBuilder. If you are already using Java 8, the code becomes more condensed as well:

+

  .retryWhen(notification -> notification .zipWith(Observable.range(1, 5), \
                Tuple::create) .flatMap(att -> att.value2() == 3 ? Observable.error(att.value1()) : \
                Observable.timer(1, TimeUnit.SECONDS) ) )

+ Here are the variations for linear, exponential and random delays:

+ Linear:

+

   // Utilizes the number of attempts for the number of seconds to wait .\
                retryWhen(notification -> notification .zipWith(Observable.range(1, 5), \
                Tuple::create) .flatMap(att -> att.value2() == 3 ? Observable.error(att.value1()) : \
                Observable.timer(att.value2(), TimeUnit.SECONDS) ) )

+ Exponential:

+

   // Uses the timer with 2^attempts to generate exponential delays \
                .retryWhen(notification -> notification .zipWith(Observable.range(1, 5), \
                Tuple::create) .flatMap(att -> att.value2() == 3 ? Observable.error(att.value1()) : \
                Observable.timer(1 << att.value2(), TimeUnit.SECONDS) ) )

+ Random:

+

   // Random between 0 and 5 seconds to retry per attempt .retryWhen(notification -> \
                notification .zipWith(Observable.range(1, 5), Tuple::create) .flatMap(att -> \
                att.value2() == 3 ? Observable.error(att.value1()) : \
                Observable.timer(new Random().nextInt(5), TimeUnit.SECONDS) ) )

+ With the synchronous code, there are not many options other than using Thread.sleep() to keep the current thread waiting until the loop is allowed to proceed:

+

   // Linear Backoff
                int tries = 0;
                while(true) {
                tries++;
                try {
                pretendWorkThatMaybeThrows(); // does some work and maybe throws
                break;
                } catch(Throwable throwable) {
                if (!(throwable instanceof CASMismatchException) || tries >= MAX_TRIES) {
                throw throwable; // rethrow exceptions
                }
                }

                Thread.sleep(TimeUnit.SECONDS.toMillis(tries));
                }

+ You can then use the same approaches as with the asynchronous ones on the Thread.sleep() time to accommodate for a static, linear, exponential or random delay.

Fallback

Instead of (or in addition to) retrying, another valid option is falling back to either a different Observable or a default value.

RxJava provides you with different operators, prefixed with onError*():

  • onErrorReturn(Func1<Throwable, T>): It is called when the source Observable errors and allows to return custom data instead.

  • onErrorResumeNext(Observable<?>): It is called when the source Observable errors and allows to resume transparently with a different Observable.

  • onErrorResumeNext(Func1<hrowable, Observable<?>): It is called when the source Observable errors and allows to transparently resume with an Observable (based on a specific Throwable).

You should use the onErrorReturn if you want to fallback to static data quickly. For example:

    Observable
      .<String>error(new Exception("I failed"))
        .onErrorReturn(new Func1<Throwable, String>() {
          @Override
          public String call(Throwable throwable) {
          // You could return data based on the throwable as well
          return "Default";
          }
          })
          .subscribe();

If you only want to return default values based on a specific exception or even call another Observable as fallback, onErrorResumeNext is what you’re looking for.

Observable
      .<String>error(new TimeoutException("I failed"))
        .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
          @Override
          public Observable<? extends String> call(Throwable throwable) {
            if (throwable instanceof TimeoutException) {
                return Observable.just("Default");
            }
            // Forward anything other than the TimeoutException
            return Observable.error(throwable);
        }
    })
    .subscribe();

If you just want to fallback onto another Observable that you have in scope without caring about the Exception, you can use the other onErrorResumeNext() overload. For example, this loads data from all replicas if the get() call did not succeed with the Java SDK:

   bucket
      .async()
      .get("id")
      .onErrorResumeNext(bucket.async().getFromReplica("id", ReplicaMode.ALL))
      .subscribe();

Synchronous fallbacks can be implemented by conditionally setting the default in the catch clause:

   String value;
      try {
      value = pretendWorkThatMaybeThrows();
      } catch(Exception ex) {
      value = "Default";
      }

Here is the gotcha: this synchronous example only works great if the fallback is static. If you need to fallback into another database call, for example, you quickly get into nested error handling and timeouts are a pain to handle (since they start to accumulate for every synchronous call). It is recommended to use asynchronous fallbacks and then block at the very end through toBlocking().single() or equivalents.

Defaults

Another possibility that requires intervention on the application side are Observables that do not emit a single value. This can happen because operators filtered the Observable so that nothing is left, or they did not produce any values in the first place. One common case in the Java SDK is get(). If the Document is not found, the Observable will complete without emitting anything.

RxJava provides helper operators that all end with *OrDefault() and allow you to return default values if no item is emitted when the Observable completes.

In most cases, you want to use singleOrDefault() and return a default value when not a single item is emitted by the source Observable:

Observable
      .<String>empty()
        .singleOrDefault("Default")
        .subscribe();

If you are dealing with potentially more than one item emitted in your Observable and you only want to emit either the first or the last value, there are also operators that allow you to emit a default if it’s unexpectedly empty. See firstOrDefault() as well as lastOrDefault() for more details.

Lastly, RxJava offers the method defaultIfEmtpy, which allows you to return a value if the source doesn’t emit anything. In recent versions, the method switchIfEmpty allows you to switch to a different Observable in that same case.

Error handling in bulk scenarios

Bulk operations are used to handle more data in one batch and, therefore, benefit from better resource utilization. However, error handling becomes more complicated. There are three high-level cases to consider:

  • Best Effort: Keep going and just use the results that succeeded when errors happen.

  • Full Retry: Retry the complete Observable when errors happen.

  • Incremental Retry: Only retry specific events of the Observable.

Before we "dig" into the specific approaches, let’s revisit the contract of Observables:

onNext* (onError | onComplete)

Zero or more events are followed by either an error event or a complete event. This provides an important clue right away: once our Observable fails, no more events can be passed through. Therefore, you need to make sure that errors are handled at the smallest scope possible, only letting it proceed if you cannot handle it right away.

Best effort bulk handling

Sometimes it is more important to get data in a timeframe, or maybe with a short timeout, then getting all data. In general, you want to keep a timeout that lets you fetch all the data, but depending on the use case you are fine with only getting a subset of the data returned.

To ignore errors and turn them into "noops", you can utilize onErrorResumeNext():

    Observable
          .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("A");
            subscriber.onNext("B");
            subscriber.onError(new IllegalStateException("Woops"));
            subscriber.onNext("C");
        }
    })
    .onErrorResumeNext(Observable.<String>empty())
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("Got: " + s);
        }
    });

This will not raise any exception to your calling thread, but it will never process "C" because, based on the Observable contract, once onError is called no more events are allowed to be generated.

To keep going in case an event fails, you need to turn each event into a single Observable and then merge it back together to either defer the error handling (through Observable#mergeDelayError) or use flatMap and make sure to contain the errors before flattening.

This code provides some fake data with which you can work. It will emit four Observables where one of them will fail:

    Observable<Observable<String>> dataObservables = Observable
          .just("a", "b", "c", "d")
          .map(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
              if (s.equals("b")) {
              return Observable.error(new IllegalStateException("I dont like b"));
              }
              return Observable.just(s);
              }
              });

You can then use mergeDelayError to defer error handling until the end. If you uncomment the onErrorResumeNext, it will silently discard any errors as well leaving you with an Observable that provides best-effort processing of the source Observables.

   Observable.mergeDelayError(dataObservables)
          //.onErrorResumeNext(Observable.<String>empty())
            .toBlocking()
            .forEach(new Action1<String>() {
              @Override
              public void call(String s) {
              System.out.println(s);
              }
              });

Alternatively, you can use flatMap and make sure the errors are contained for each emitted Observable:

    Observable
          .just("a", "b", "c", "d")
          .flatMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
              // Simulate some observable that sometimes fails
              Observable<String> obs;
                if (s.equals("b")) {
                obs = Observable.error(new IllegalStateException("I dont like b"));
                } else {
                obs = Observable.just(s);
                }

                // Here comes the actual handling part before we flatten it back
                return obs.onErrorResumeNext(Observable.<String>empty());
                  }
                  })
                  .toBlocking()
                  .forEach(new Action1<String>() {
                    @Override
                    public void call(String s) {
                    System.out.println(s);
                    }
                    });

As a more practical example here is a best effort bulk loading of documents from Couchbase Server that just discards operations that fail:

    private static Observable<JsonDocument> loadDocsBestEffort(Bucket bucket, List<String> ids) {
            return Observable
            .from(ids)
            .flatMap(new Func1<String, Observable<JsonDocument>>() {
              @Override
              public Observable<JsonDocument> call(String id) {
                return bucket
                .async()
                .get(id)
                .onErrorResumeNext(Observable.<JsonDocument>empty());
                  }
                  });
                  }

Of course, you can add more logic onto each emitted Observable and also proper logging so that you at least know which errors occurred.

Full retry bulk handling

Full retry bulk handling can be done by just chaining one of the retry operators right at the end of the Observable chain.

This example uses Java 8 Lambda syntax for conciseness.
    private static Observable<JsonDocument> loadDocsFullRetry(Bucket bucket, List<String> ids) {
              return Observable
              .from(ids)
              .flatMap(id -> return bucket.async().get(id))
              .retry(4);
              }

This is, however, a bit crude because the whole list of IDs will be retried if there is an error. This is the case even if the error happens late in the stream and some items have already been emitted. Imagine that you retrieve items A, B, C, D and that an error occurs just the first time the SDK retrieves C. What you will see is A, B, A, B, C, D.

This can be mitigated by using the distinct() operator, but keep in mind the memory tradeoff (distinct will maintain a set of values it saw in the background, which can be problematic for infinite or very large streams).

Incremental retry bulk handling

Incremental retry improves on full retry by only actually retrying elements that couldn’t be loaded. It is easily done by chaining the retry operators inside the flatMap instead of outside of it.

    private static Observable<JsonDocument> loadDocsIncrementalRetry(Bucket bucket, List<String> ids) {
              return Observable
              .from(ids)
              .flatMap(id -> return bucket.async().get(id)
              .retry(4)
              );
              }

One thing to keep in mind with flatMap is that the order of emissions isn’t guaranteed to be the same as the source emissions. This can be especially visible in this case since individual streams are retried on their own: retried keys could come back with a value much later than other keys, more often changing the order in which they appear.

Bulk pattern, BackpressureException and reactive pull backpressure

The bulk pattern is powerful, but if you try it with a very large collection, it is especially susceptible to a BackpressureException. This exception is a way for the SDK to tell you that you’re asking for too much in a short amount of time, and requests have piled up too much while waiting for a server response to free up a spot.

By default, the internal structure that holds pending requests is sized to a little over 16 000 slots. Since from iterates over a collection in memory, which is very fast, it can easily flood the SDK if the size of it is over or close to 16K.

You could try to react to that by implementing a retry strategy, but it is hard to get it right. Fortunately, RxJava comes with an even better solution, a form of flow-control called reactive pull backpressure that you can leverage to avoid getting a BackpressureException.

This is done through the Subscriber, its onStart() method and the request(n) method.

You have to use the Subscriber to subscribe() instead of the convenience methods that just take Action individually for next, error and completed handlers.
    Observable<JsonDocument> largeBulk = Observable
              .from(oneMillionIds)
              .flatMap(id -> return bucket.async().get(id));

              //the subscription is where reactive pull backpressure happens
              largeBulk.subscribe(new Subscriber<JsonDocument>() {

                //onStart initiates the backpressure
                @Override
                public void onStart() {
                request(100);
                }

                @Override
                public void onNext(JsonDocument jsonDocument) {
                System.out.println(jsonDocument.content());
                request(100); //request more items
                }

                //onCompleted and onError are less relevant.
                //Note that if you want to block you'd need a CountDownLatch in this case

                @Override public void onCompleted() { System.out.println("Done"); }

                @Override public void onError(Throwable e) { e.printStackTrace(); }

                });

The request(n) method allows to notify the upstream source that the consumer is ready for more, and is prepared to consume n items. For example, this avoids instant iteration of the collection in the case of from. And as long as responses come in from Couchbase Server, we continue asking for more.

This is a form of error mitigation, and we’ll see a few others in the next section.

Error mitigation

The idea behind error mitigation is to limit the impact a failing piece had on the rest of the system. When an error occurs, you don’t want it to cause failures downstream. You also want to prevent as much of the error conditions to happen as possible.

The BackpressureException and RxJava’s reactive pull backpressure are forms of error mitigation we already saw since they both allow a form of flow control that avoids overloading the system.

Slowing down

Rate limiting of the requests can be a way to keep the system within parameters that are known to be stable, thus mitigating overloading errors. This can happen in the client-to-server direction, which avoids making more requests than can be processed later. It can also happen in the server-to-client direction, when the client can’t keep up with the bandwidth at which data is sent from the server because, for instance, it has to process it through long-running tasks. Rather than choking on that data when backpressure mechanisms are not available, the client can decide to discard some of the data. This is referred to as Load Shedding.

For example, consider a system where multiple sensors write data into Couchbase, and the client application reads that data and displays it in a UI. The graphs in the UI can be built on a sample of the data, and that’s the key point. The processing rate of the data is lower than its producing rate, but at the same time the resolution is also lower. That means we can simply ignore some of the data and only consume what’s needed, such as take the data point at every second to trigger a refresh of the graph in the UI.

RxJava allows you to do that through several operators:

  • sample (alternatively throttleLast) periodically looks at an Observable and emits the item last seen during this period.

  • throttleFirst does the same except it emits the first item seen during the period.

  • debounce only emits an item if a given timespan has passed since the last emission, eliminating items that are rapidly following another item.

The operator sample is useful for sources that are "too chatty" even at a regular rate. The operator debounce, on the other hand, is really shining with sources that have a variable rate of emission. The sources only trigger a web search for autocomplete suggestions from an input form once the user has stopped typing for 500ms.

Semaphore

To limit the strain put on dependencies of your application, one possibility is to put hard limits on how many concurrent threads can access a resource. This is done with the classic concurrency programming pattern of the Semaphore, which is implemented in java.util.concurrent since Java 7.

Note that Semaphores are a bit different from locks since they don’t have a notion of ownership. For example, two different threads can respectively acquire() and release() a Semaphore permit), which can avoid things like deadlocks.

The same concept could also be implemented using a fixed-size thread pool. Such thread pool adds overhead but allows upstream code to just "walk away" from the call if it takes too long for a slot to become available, providing an even better isolation.

Collapsing requests

Another way of mitigating errors is to batch similar requests together and "collapse" them into one request to the subsystem. If you have multiple threads that potentially ask for the same data, the benefit can be immediate by just putting a facade on your calls that will deduplicate parallel requests.

You can go beyond and trade a little bit of latency to allow for more potential deduplication. Wait a little longer to see if any duplicate request comes in before firing the actual deduplicated request.

Implementation

All the mentioned techniques can be easily implemented in an application in a RxJava-compatible way by using another library from Netflix, Hystrix. It is a very comprehensive library for building resilient applications, including techniques for error mitigation but also failing fast. Here, the circuit-breaker pattern short-circuits calls to a service that is known to be down until we detect it is back up, which stops cascading failures, provides fallbacks and gathers metrics.

Error causes

This section discusses errors that need to be covered by the application developer to make the application resilient.

Timeouts

Timeouts are a very important part of a distributed application that needs to be resilient. When a network is involved, you can be sure that something will fail, be it a network partition, a server that takes a long time to answer or one that is simply down. Timeouts are both your worst nightmare and a very important safety net. You should never let a call hang forever. On the contrary, it is usually better to fail fast and retry/fallback, as we saw earlier. The sane timeouts (not too long, not too short) give you that guarantee that the call won’t block forever. They can keep you awake at night because you are virtually guaranteed to see some of them in production. However, that’s OK once you embrace them because they do, in fact, give you control. You decide how much time your application will wait before either a result or an error happens, and there are no more random (and potentially very long) outliers.

You have to decide on relevant durations for your timeouts, and then put them into place, and finally design and implement the failure-recovery scenarios for the situation when a timeout occurs. This should be guided in part by your SLAs: say your SLAs indicate that the app should respond in 1.5s in the 99th percentile, and then you’ll probably have to set timeouts below this duration. Think about the compounding effect as well! Note that it is usually a good idea to come up with SLAs internally if your business doesn’t explicitly define some.

There are so many places where you can set a timeout, especially in fan-out requests scenarios where you could set an individual timeout on each fanned-out request or a global timeout, on the whole operation.

Setting timeouts on an async workflow

RxJava offers an easy way to set up a timeout on any Observable: simply chain in a call to timeout(long duration, TimeUnit unit). If the timeout is triggered, the Observable is notified of a TimeoutException in its onError handler.

The underlying timer used for enforcing the timeout considers the delay between each emission in the source Observable. It is by default running on the Schedulers.computation() Scheduler. Note that in the Couchbase blocking API, this TimeoutException is in turn wrapped into a RuntimeException so that only unchecked exceptions are used in the API.

There is a timeout variant that will directly switch to a provided fallback Observable instead of propagating an Exception. Another variant also accepts a timeout selector instead of a simpler duration, in the form of an Observable for each emission. If said Observable completes before a new emission is seen in the source Observable, this is considered a timeout. This is a much more flexible implementation should you needs go beyond the simple duration-based timeout.

Timeouts in the Couchbase synchronous API

The way the synchronous API is built around calling the async API and blocking on it means that you cannot weave in timeouts from RxJava directly. So the API offers overloads for every methods that allow you to explicitely set a timeout duration.

If you don’t provide one explicitly on each operation, a default timeout will be used. Each operation belongs to a broad category for which a default timeout can be tuned in the CouchbaseEnvironment used to configure the SDK (for example,connectionTimeout for bootstrapping, kvTimeout for Key/Value operations or viewTimeout for view operations), so there’ll never be a blocking operation that doesn’t have a timeout set.

BackpressureException

Sometimes you’ll see a BackpressureException being propagated in your Observable. This error indicates that the SDK was overloaded with too many operations. By default, the SDK maintains an internal structure called the Ring Buffer, which has a little over 16K slots for queuing up requests. As soon as the server responds, the corresponding request’s slot is freed. BackpressureException happens when this ring buffer becomes full, meaning that 16K requests were still pending, and the server couldn’t keep up with the load.

One very common cause for this is to use a bulk get/set using Observable.from(collection) where collection is larger than or close to the size of the ring buffer. Since Observable.from consumes the collection from memory in one sweep, it can be done extremely fast, thus filling the ring buffer almost instantly and causing a BackpressureException (such as clogging the pipes).

We’ve seen the solution to this in the section "Bulk pattern, BackpressureException and reactive pull backpressure". One can use RxJava’s reactive pull backpressure mechanisms only to consume from the collection as slots free up. Another possibility is to implement retry and backoff semantics, especially if the error happens out of a bulk context. For example, that happens if the server is under heavy load, and locally the application does many operations in parallel, but these operations are isolated from one another instead of batches.

Correctly Dealing with Binary Data, ByteBuf, and Memory Leak

The Java SDK uses Netty for network IO. The library has an implementation of byte buffers that go beyond what the JVM currently offers, ByteBuf. Such buffers can wrap JVM buffers, use memory from the heap or off the heap, and be pooled or not. For performance reasons, the SDK mainly uses pooled off-heap ByteBuf.

Netty’s buffers are reference counted. Once their refCnt() (reference count) reaches 0, the buffer is cleaned up, returned to the pool (if it was pooled). Most operations will automatically release() the buffers, especially when a buffer is written on the wire. But sometimes you’ll need to retain()the buffer if you need to reuse it after such an automatic cleanup. Usually, the symptom for this is an IllegalReferenceCountException mentioning a refCnt: 0.

The opposite symptom is when you see a trace in the logs that says LEAK: ByteBuf.release() was not called before it’s garbage-collected. This is symptomatic of a buffer that is not released correctly. Netty will by default sample 1% of the created buffer, which can be tuned up by using ResourceLeakDetector.setLevel(...) method. This message appears in the logs when one of these sampled buffers is detected to be garbage-collected while still having a refCnt > 0.

One context where both of these are more likely to happen is when you use a BinaryDocument. Since this kind of Document exposes the raw ByteBuf to the user, it is your responsibility to manage the buffer correctly. On the write path, you create the buffer and you pass it to the SDK through the BinaryDocument. The SDK will take it from here and Netty will release the buffer once it writes it down the wire during IO. However, on the read path the SDK creates a buffer and passes it to you, giving you responsibility for release(). See the Netty documentation for more information about managing buffers.

There could also be subtleties with use cases where you reuse a buffer, like in the following code. The buffer CAN be used twice, and second time as fallback if the document didn’t exist:

  byteBuffer.retain(); //prepare for potential multi usage (+1 refCnt, refCnt = 2)
        try {
        bucket.append(document);
        // refCnt = 2 on success
        byteBuffer.release(); //refCnt = 1
        } catch (DocumentDoesNotExistException dneException) {
        // buffer is released on errors, refCnt = 1
        //second usage will also release, but we want to be at refCnt = 1 for the finally block
        byteBuffer.retain(); //refCnt = 2
        bucket.insert(document); //refCnt = 1
        } // other uncaught errors will still cause refCnt to be released down to 1
        finally {
        //we made sure that at this point refCnt = 1 in any case (success, caught exception, uncaught exception)
        byteBuffer.release(); //refCnt = 0, returned to the pool
        }

You have to prepare everything beforehand by calling retain() because once the refCnt reaches 0 you cannot do anything else with the buffer.

Operation Effects

Most operations have a set of error conditions described in their corresponding Javadoc. This section describes a few that are quite common and their usual meaning:

  • DocumentDoesNotExistException

  • DocumentAlreadyExistsException

  • CASMismatchException

  • TemporaryLockFailureException

  • DurabilityException

  • TemporaryFailureException

The first two, DocumentDoesNotExistException and DocumentAlreadyExistsException, happen respectively when using the replace() operation and the insert() operation. First operation semantics implies that the operation expects a document with the provided key to be already stored in the database. If this is not the case, the exception is thrown. On the other hand of the spectrum, insert semantics implies that there should not be any document in the database with the given key. Hence the DocumentAlreadyExistsException when one such document is found. Note that if you want a write operation that works in both cases, you can use upsert(), which will by design also ignore the CAS value.

Mutative operations will usually take the Document’s CAS into account and update it in the returned instance of Document. This is optimistic locking, meaning that the database is optimistic about the rarity of a clash of writes. Instead of paying the costly overhead of locking the document whenever it is accessed, the engine will instead refuse to execute one of the conflicting operations, detecting such changes by a variation of the CAS metadata. This surfaces in the SDK with a CASMismatchException. Usually, the way to go in this case is to retry by issuing a get() command to get the latest version of the document, or the one that took precedence. Then re-apply the mutation, maybe by reapplying a delta change or maybe by presenting the document to the user for the edition, and re-perform replace().

Couchbase also allows to perform pessimistic locking, meaning that a particular key cannot be touched by anyone except the performer of agetAndLock() for up to 30 seconds. If some code tries to access or mutate a locked document, a TemporaryLockFailureException will be thrown. Once again, the best way to deal with that error is to try again later, but you’ll usually have to make sure that the attempted change does not overwrite the one performed during lock. After all, it was important enough that a pessimistic lock was enforced around it.

Durability constraints (ReplicateTo and PersistTo) allow you to instruct the SDK not to return until the database has acknowledged a certain level of replication/persistence. This is done using the corresponding overloads, on a per-operation basis. The SDK will perform the operation and will poll the cluster until enough replicate nodes have acknowledged seeing the mutation in RAM (for ReplicateTo) and/or having persisted it on disk (PersistTo). The original operation and the polling for durability constraint requirements are two separate things. It the later fails a DurabilityException will be raised mainly for these reasons:

  • The constraint cannot be met in the first place due to replication factor being set up too low. The cause of the exception would be a ReplicaNotConfiguredException.

  • One of the replicas is down and brings the total number of alive replicas under the number required for the durability constraint to be met. The cause of the exception would then be a ReplicaNotAvailableException.

Finally, Couchbase Server is sometimes in a very busy state, for example: rebalance, reindexing and heavy load combined. In some cases, it will prefer to drop a request rather than failing into an unstable state. This TemporaryFailureException indicates that the server couldn’t answer, but this is due to a transient state. Retrying later with that kind of exception is very likely to succeed.