A newer version of this documentation is available.

View Latest

Atomic operations

The CouchbaseBucket class provides atomic operations such as counter(), append(), and prepend().

Counter

The counter() method allows you to increment or decrement a document with a numerical content atomically. The method only accepts and returns a JsonLongDocument. The value stored in the document is incremented or decremented depending on the given delta: if the delta value is a positive number, the value is incremented, and if it is a negative number, the value is decremented. You can also pass in an initial value and an expiration time.

// Increase the counter by 5 and set the initial value to 0 if it does not exist
JsonLongDocument doc = bucket.counter("id", 5, 0);

The resulting document contains the new counter value. A very common use case is to implement an increasing AUTO_INCREMENT like counter, where every new user just gets a new ID (here using the asynchronous API):

bucket.async()
    .counter("user::id", 1, 1)
    .map(new Func1<JsonLongDocument, String>() {
        @Override
        public String call(JsonLongDocument counter) {
            return "user::" + counter.content();
        }
    })
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.insert(JsonDocument.create(id, JsonObject.empty()));
        }
    }).subscribe();

This code increases the counter by one, and then maps the returned number onto a custom document ID (here the code prefixes user::). Afterward, the insert method is executed with the generated ID and an empty document content. Because a counter operation is atomic, the code is guaranteed to deliver different user IDs, even when called at the same time from multiple threads.

The counter always needs to be greater than or equal to zero because negative values are not allowed. If you want to decrement a counter, make sure to set it to a value greater than zero initially.

If the initial value is omitted and the counter doesn’t exists, this is signaled to the user by propagating a DocumentDoesNotExistException (since 2.2.0). You can avoid that by providing an explicit initial value, which could be the same as the delta or even an arbitrary initial value (the delta won’t be added to it at counter creation):

// Increase the counter by 5 or create the counter with a value of 4 if it does not exist
JsonLongDocument doc = bucket.counter("id", 5, 4);

If you want to set an expiration time, you need to provide both the initial value and the expiration time. This constraint is imposed by the API because just exposing the expiration time would be ambiguous with the initial value (long and int).

// Increment by 5, initial 5 and 3 second expiration
JsonLongDocument doc = bucket.counter("id", 5, 5, 3);

Append & Prepend

Appending and prepending values to existing documents is also possible. Both the append and prepend operation are atomic so that they can be used without further synchronization.

Both operations only work on binary documents, ideally strings or byte arrays. It does not work on JSON documents because it doesn’t do any further inspection. Applying one of those operations on a JSON document will render it invalid.

A Document needs to be created before values can be appended or prepended. Here is an example that creates a document and then appends a string to it:

bucket
    .insert(LegacyDocument.create("doc", "Hello, "))
    .flatMap(doc ->
        bucket.append(LegacyDocument.create("doc", "World!"))
    )
    .flatMap(bucket::get)
    .toBlocking()
    .forEach(doc -> System.err.println(doc.content()));

When executed, this code prints Hello, World!.

Durability Requirements

If no durability requirements are set on the append, prepend or counter methods, the operation will succeed when the server acknowledges the document in its managed cache layer. While this is a performant operation, there might be situations where you want to make sure that your document has been persisted or replicated so that it survives power outages and other node failures.

All atomic operations provide overloads to supply such durability requirements:

D append(D document, PersistTo persistTo);
D append(D document, ReplicateTo replicateTo);
D append(D document, PersistTo persistTo, ReplicateTo replicateTo);

D prepend(D document, PersistTo persistTo);
D prepend(D document, ReplicateTo replicateTo);
D prepend(D document, PersistTo persistTo, ReplicateTo replicateTo);

JsonLongDocument counter(String id, long delta, PersistTo persistTo);
JsonLongDocument counter(String id, long delta, ReplicateTo replicateTo);
JsonLongDocument counter(String id, long delta, PersistTo persistTo, ReplicateTo replicateTo);

JsonLongDocument counter(String id, long delta, long initial, PersistTo persistTo);
JsonLongDocument counter(String id, long delta, long initial, ReplicateTo replicateTo);
JsonLongDocument counter(String id, long delta, long initial, PersistTo persistTo, ReplicateTo replicateTo);

JsonLongDocument counter(String id, long delta, long initial, int expiry, PersistTo persistTo);
JsonLongDocument counter(String id, long delta, long initial, int expiry, ReplicateTo replicateTo);
JsonLongDocument counter(String id, long delta, long initial, int expiry, PersistTo persistTo, ReplicateTo replicateTo);

You can configure either just one or both of the requirements. From an application point of view nothing needs to be changed when working with the response, although there is something that need to be kept in mind:

The internal implementation first performs a regular operation and afterward starts polling the specifically affected cluster nodes for the state of the document. If something fails during this operation (and failing the observable), the original operation might have succeeded nonetheless.