You are viewing the documentation for a prerelease version.

View Latest

Distributed Transactions from the Java SDK

A practical guide to using Couchbase’s distributed ACID transactions, via the Java API.

This is the beta release of Distributed Transactions for Couchbase, providing these semantics and features:

  • Insertion, mutation, and removal of multiple documents can be staged inside a transaction.

  • Until the transaction is committed, these changes will not be visible to other transactions, or any other part of the Couchbase Data Platform.

  • An isolation level of Read Committed, to support high performance and scalability.

  • A high-level and easy-to-use API that allows the developer to express what they want the transaction to do as a block of logic, while the library takes care of the nuts and bolts of handling any temporary conflicts with other transactions.

Please see our introduction to ACID Transactions for a guide to the benefits and trade-offs of multi-document transactions.

Requirements

  • Couchbase Server 6.5 or above.

  • Couchbase Java client 3.x. It is recommended to follow the transitive dependency from maven.

  • NTP should be configured so nodes of the Couchbase cluster are in sync with time. The time being out of sync will not cause incorrect behavior, but can impact metadata cleanup.

  • The application, if it is using extended attributes (XATTRs), must avoid using the XATTR field txn, which is now reserved for Couchbase use.

Getting started

Couchbase transactions require no additional components or services to be configured. Simply add the transactions library into your project. With gradle this can be accomplished by modifying these sections of your build.gradle file like so — along with adding a separate repository where the beta is published:

dependencies {
    compile group: 'com.couchbase.client', name: 'couchbase-transactions', version: '1.0.0-beta.1'
}

repositories {
    maven { url 'http://files.couchbase.com/maven2/' }
}

Use of this maven repository from other tools is similar. Use the same group, artifact, and version in your maven-compatible tool of choice.

Initializing Transactions

The starting point is the Transactions object. It is very important that the application ensures that only one of these is created, as it performs automated background processes that should not be duplicated.

// Initialize the Couchbase cluster
Cluster cluster = Cluster.connect("localhost", "transactor", "mypass");
bucket = cluster.bucket("transact");
collection = bucket.defaultCollection();

// Create the single Transactions object
Transactions transactions = Transactions.create(cluster, TransactionConfigBuilder.create()
        // The configuration can be altered here, but in most cares the defaults are fine.
        .build());

The default configuration will perform all writes with the durability setting Majority, ensuring that each write is available in-memory on the majority of replicas before the transaction continues. The application is free to adjust this to the level of None for performance at the cost of some data safety and possible loss of atomicity in the advent of node failures, or to a higher durability level for even greater safety (of course, at some loss in performance). Majority has been selected as the best default durability for most use-cases.

Creating a Transaction

A core idea of the library is that you provide your transactional logic inside a callback, and the transactions library takes care of getting the transaction committed, retrying the logic as often as required (e.g. in the event of a temporary conflict with another transaction).

Each run of the callback is called an attempt, inside an overall transaction.

As with the Couchbase Java Client, you can use the library in either synchronous mode:

Transactions transactions = Transactions.create(cluster);

try {
    transactions.run((ctx) -> {
        // 'ctx' is an AttemptContext, which permits getting, inserting,
        // removing and replacing documents, along with committing and
        // rolling back the transaction.

        // ... Your transaction logic here ...

        // This call is optional - if you leave it off, the transaction
        // will be committed anyway.
        ctx.commit();
    });
} catch (TransactionFailed e) {
    for (LogDefer err : e.result().log().logs()) {
        // Optionally, log failures to your own logger
    }
}

or asynchronous modes, using the Project Reactor reactive library:

Transactions transactions = Transactions.create(cluster);

Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
    // 'ctx' is an AttemptContextReactive, providing asynchronous versions of the AttemptContext methods

    return

            // Your transaction logic here: as an example, get and remove a doc
            ctx.getOrError(collection.reactive(), "document-id")
                    .flatMap(doc -> ctx.remove(doc))

                    // The commit call is optional - if you leave it off,
                    // the transaction will be committed anyway.
                    .then(ctx.commit());

}).doOnError(err -> {
    if (err instanceof TransactionFailed) {
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // Optionally, log failures to your own logger
        }
    }
});

// Normally you will chain this result further and ultimately subscribe.  For simplicity, here we just block on the result.
TransactionResult finalResult = result.block();

The blocking mode is the easiest to write and understand and expects that you are running within a framework, such as a servlet container, that manages concurrency for you. The asynchronous API allows you to build your application in a reactive style, without large thread pools, which can help you scale with excellent efficiency. Those new to reactive programming may want to check out the Project Reactor site for more details on this powerful paradigm.

Some AttemptContextReactive methods, notably remove, return Mono<Void>. Be careful to use then rather than flatMap or similar on these, as Mono<Void> will only trigger a completion event, and not the next event, so many methods including flatMap will not work as expected.

Examples

This is a quick summary of the main transaction operations. They are described in more detail below.

With the synchronous API
Transactions transactions = Transactions.create(cluster);

try {
    transactions.run((ctx) -> {
        // Inserting a doc:
        String docId = "aDocument";
        ctx.insert(collection, docId, JsonObject.create());

        // Getting documents:
        // Use ctx.get if the document may or may not exist
        Optional<TransactionJsonDocument> docOpt = ctx.get(collection, docId);

        // Use ctx.getOrError if the document should exist, and the transaction will fail if not
        TransactionJsonDocument doc = ctx.getOrError(collection, docId);

        // Replacing a doc:
        TransactionJsonDocument anotherDoc = ctx.getOrError(collection, "anotherDoc");
        // TransactionJsonDocument is immutable, so get its content as a mutable JsonObject
        JsonObject content = anotherDoc.contentAs(JsonObject.class);
        content.put("transactions", "are awesome");
        ctx.replace(anotherDoc, content);

        // Removing a doc:
        TransactionJsonDocument yetAnotherDoc = ctx.getOrError(collection, "yetAnotherDoc");
        ctx.remove(yetAnotherDoc);

        ctx.commit();
    });
} catch (TransactionFailed e) {
    for (LogDefer err : e.result().log().logs()) {
        // Optionally, log failures to your own logger
    }
}
With the asynchronous API
Transactions transactions = Transactions.create(cluster);

Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
    return
            // Inserting a doc:
            ctx.insert(collection.reactive(), "aDoc", JsonObject.create())

                    // Getting and replacing a doc:
                    .then(ctx.getOrError(collection.reactive(), "anotherDoc"))
                    .flatMap(doc -> {
                        JsonObject content = doc.contentAs(JsonObject.class);
                        content.put("transactions", "are awesome");
                        return ctx.replace(doc, content);
                    })

                    // Getting and removing a doc:
                    .then(ctx.getOrError(collection.reactive(), "yetAnotherDoc"))
                    .flatMap(doc -> ctx.remove(doc))

                    // Committing:
                    .then(ctx.commit());

}).doOnError(err -> {
    if (err instanceof TransactionFailed) {
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // Optionally, log failures to your own logger
        }
    }
});

// Normally you will chain this result further and ultimately subscribe.
// For simplicity, here we just block on the result.
result.block();

Inserting documents

With the asynchronous API:
Transactions transactions = Transactions.create(cluster);

transactions.reactive().run((ctx) -> {
    return ctx.insert(collection.reactive(), "docId", JsonObject.create()).then();
}).block();
```

And the synchronous API:
```
Transactions transactions = Transactions.create(cluster);

transactions.run((ctx) -> {
    String docId = "docId";

    ctx.insert(collection, docId, JsonObject.create());

});

Getting Documents

There are two ways to get a document, get and getOrError:

Optional<TransactionJsonDocument> docOpt = ctx.get(collection, docId);
TransactionJsonDocument doc = ctx.getOrError(collection, docId);

getOrError will throw KeyNotFoundException if the document doesn’t exist, fast failing the transaction, and is provided as a convenience method so the developer does not have to check the Optional if the document must exist for the transaction to succeed.

Gets will 'read your own writes', e.g. this will succeed:

transactions.run((ctx) -> {
    String docId = "docId";

    ctx.insert(collection, docId, JsonObject.create());

    Optional<TransactionJsonDocument> doc = ctx.get(collection, docId);

    assert(doc.isPresent());
});

Other transactions will not see the inserted document until this transaction commits. Non-transactional reads will see an empty document, where they would have otherwise seen a document not existing.

Replacing Documents

With the asynchronous API:
transactions.reactive().run((ctx) -> {
    return ctx.getOrError(collection.reactive(), "anotherDoc")
            .flatMap(doc -> {
                JsonObject content = doc.contentAs(JsonObject.class);
                content.put("transactions", "are awesome");
                return ctx.replace(doc, content);
            })
            .then(ctx.commit());
});
Synchronous API:
transactions.run((ctx) -> {
    TransactionJsonDocument anotherDoc = ctx.getOrError(collection, "anotherDoc");
    JsonObject content = anotherDoc.contentAs(JsonObject.class);
    content.put("transactions", "are awesome");
    ctx.replace(anotherDoc, content);
});

These changes are staged until commit: other transactions, and regular gets, will see the original unmodified doc.

Removing Documents

With the asynchronous API:
transactions.reactive().run((ctx) -> {
    return ctx.getOrError(collection.reactive(), "anotherDoc")
            .flatMap(doc -> ctx.remove(doc));
});
Synchronous API:
transactions.run((ctx) -> {
    TransactionJsonDocument anotherDoc = ctx.getOrError(collection, "anotherDoc");
    ctx.remove(anotherDoc);
});

As with inserted and replaced docs, these changes are staged until commit: both other transactions, and regular gets, will see the unremoved doc.

Committing

Committing is automatic: if there is no explicit call to ctx.commit() at the end of the transaction logic callback, and no exception is thrown, it will be committed.

With the asynchronous API, if you leave off the explicit call to commit then you may need to call .then() on the result of the chain to convert it to the required Mono<Void> return type:

Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
    return ctx.getOrError(collection.reactive(), "anotherDoc")
                    .flatMap(doc -> {
                        JsonObject content = doc.contentAs(JsonObject.class);
                        content.put("transactions", "are awesome");
                        return ctx.replace(doc, content);
                    })
                    .then();
});

As soon as the transaction is committed, the changes will be atomically visible to other transactions. This is achieved by having each transaction have a single point of truth regarding its commit status, which is checked whenever a transaction discovers a document that’s mid-transaction.

Commit is final: after the transaction is committed, it cannot be rolled back.

A Full Transaction Example

Let’s pull together everything so far into a more real-world example of a transaction.

This example is based on the Game Simulation sample bucket, which can be installed in the Couchbase Admin UI through Settings → Sample Buckets.

The sample data simulates that of a simple Massively Multiplayer Online game, and includes documents representing:

  • Players, with experience points and levels;

  • Monsters, with hitpoints, and the number of experience points a player earns from their death.

In this example, the player is dealing damage to the monster. The player’s client has sent this instruction to a central server, where we’re going to record that action. We’re going to do this in a transaction, as we don’t want a situation where the monster is killed, but we fail to update the player’s document with the earned experience. (Though this is just a demo - in reality, the game would likely live with the small risk and limited impact of this, rather than pay the cost for using a transaction.)

public void playerHitsMonster(int damage, String playerId, String monsterId) {
    Transactions transactions = getTransactions();

    try {
        transactions.run((ctx) -> {
            TransactionJsonDocument monsterDoc = ctx.getOrError(collection, monsterId);
            TransactionJsonDocument playerDoc = ctx.getOrError(collection, playerId);

            int monsterHitpoints = monsterDoc.contentAs(JsonObject.class).getInt("hitpoints");
            int monsterNewHitpoints = monsterHitpoints - damage;

            if (monsterNewHitpoints <= 0) {
                // Monster is killed.  The remove is just for demoing, and a more realistic example would set a
                // "dead" flag or similar.
                ctx.remove(monsterDoc);

                // The player earns experience for killing the monster
                int experienceForKillingMonster = monsterDoc.contentAs(JsonObject.class).getInt("experienceWhenKilled");
                int playerExperience = playerDoc.contentAs(JsonObject.class).getInt("experience");
                int playerNewExperience = playerExperience + experienceForKillingMonster;
                int playerNewLevel = calculateLevelForExperience(playerNewExperience);

                JsonObject playerContent = playerDoc.contentAs(JsonObject.class);

                playerContent.put("experience", playerNewExperience);
                playerContent.put("level", playerNewLevel);

                ctx.replace(playerDoc, playerContent);
            }
            else {
                // Monster is damaged but still alive
                JsonObject monsterContent = monsterDoc.contentAs(JsonObject.class);
                monsterContent.put("hitpoints", monsterNewHitpoints);

                ctx.replace(monsterDoc, monsterContent);
            }
        });
    } catch (TransactionFailed e) {
        // The operation failed.   Both the monster and the player will be untouched.

        // Situations that can cause this would include either the monster
        // or player not existing (as getOrError is used), or a persistent
        // failure to be able to commit the transaction, for example on
        // prolonged node failure.
    }
}

Concurrency with Non-Transactional Writes

This release of transactions for Couchbase requires a degree of co-operation from the application. Specifically, the application should ensure that transactional and non-transactional writes (such as the SDK API using the data service or issuing N1QL UPDATES) are never done concurrently with documents involved in a transaction. It is recommended that if your app is using transactions, all transacted-upon document access for mutation is through the transactions API. In many applications, this means you will have a mix of use of the traditional API for documents not involved in transactions, and the transactions API for documents used from transactions, even if some sections of that logic do not require multi-document atomicity. Queries and read access from other APIs are perfectly fine, and are at a Read Committed isolation level. We may change this in a future release.

To help detect that this requirement is fulfilled, the application can subscribe to the client’s event logger and check for any TransactionIllegalDocumentStateEvent events, like so:

cluster.environment().eventBus().subscribe(event -> {
    if (event instanceof TransactionIllegalDocumentStateEvent) {
        // log this event for review
    }
});

Rollback

Rollback is handled automatically: if an exception is thrown, either by the transactional logic, or within the transactions library, then that attempt is rolled back. The transaction logic may or may not be retried, depending on the exception. If the transaction is not retried then it will throw a TransactionFailed exception, and its getCause method can be used for more details on the failure.

The transaction can also be explicitly rolled back:

transactions.run((ctx) -> {
    ctx.insert(collection, "docId", JsonObject.create());

    Optional<TransactionJsonDocument> docOpt = ctx.get(collection, "requiredDoc");
    if (docOpt.isPresent()) {
        ctx.remove(docOpt.get());
        ctx.commit();
    }
    else {
        ctx.rollback();
    }
});

In this case, the transaction will be regarded as successful, e.g. no TransactionFailed is thrown.

After a transaction is rolled back, it cannot be committed, and the library will not try to automatically commit it at the end of the code block.

Error Handling

Error handling differs depending on whether a transaction is pre or post the point of commit (or rollback).

Before the Commit (or Rollback) Point:

In the advent of transient errors, your transaction logic will be retried multiple times if necessary until the transaction expires, commits, or rollbacks. Each retry is called an attempt, and each failed attempt will be automatically rolled back before a new attempt is started. This is expected behavior in the situation that multiple concurrent actors are trying to perform a transaction on the same document. In general, it should just cause higher latency.

After the Commit (or Rollback) Point:

This is the 'point of no return'. The transaction must complete commit (or rollback) at this stage. The library will keep trying to commit (or rollback), until the transaction expires.

If the transaction expires, then a TransactionFailed exception will be thrown. There is a background cleanup process run by each client whose responsibility is to find any 'lost' half-completed transactions, and continue trying to push forwards the commit or rollback stage until they succeed. If the background cleanup job fails in process, another cleanup job will identify this and complete the commit or rollback.

The transaction will expire at the end of the configured expiration period. The default is 15 seconds, and it can be set with:

TransactionConfigBuilder.create().expirationTime(Duration.of(15, ChronoUnit.SECONDS)).build();

See the documentation for the expirationTime method for a discussion of the trade-offs of changing this.

Only transient errors will cause your transaction logic to be retried. The most common causes will be:

  • A write-write conflict with another transaction, e.g. two transactions trying to write the same document. One transaction will abort and retry.

  • A temporary failure error from the server, e.g. if it is temporarily overloaded.

All other errors will cause the transaction to fail immediately.

These errors include:

  • Any exception thrown by your transaction logic, either deliberately or through an application logic bug.

  • Attempting to insert a document that already exists.

  • Attempting to remove or replace a document that does not exist.

  • Calling getOrError on a document id that does not exist.

If the transaction does fail, it will throw TransactionFailed, or an exception derived from it. How to handle it is app-dependent. In the majority of cases, logging it for human review may be the best solution. See Logging for how to also log useful information related to the transaction.

Logging

To aid troubleshooting, each transaction maintains a list of log entries, which can be logged on failure like this:

} catch (TransactionFailed e) {
    yourLogger.warning("Transaction " + e.result().transactionId() + " failed:");
    for (LogDefer err : e.result().log().logs()) {
        yourLogger.info(err.toString());
    }
}

or for the asynchronous API:

.doOnError(err -> {
    if (error instanceof TransactionFailed) {
        TransactionFailed err = (TransactionFailed) error;
        yourLogger.warning("Transaction " + err.result().transactionId() + " failed:");
        for (LogDefer e : err.result().log().logs()) {
            yourLogger.info(err.toString());
        }
    }
});

For convenience there is also a config option that will automatically write this programmatic log to the standard Couchbase Java logging configuration inherited from the SDK if a transaction fails. This will log all lines of any failed transactions, to WARN level:

TransactionConfigBuilder.create().logOnFailure(true, Event.Severity.WARN).build();

By default the Couchbase Java logging event-bus is setup to look for and use SLF4J/logback, log4j1, and log4j2 on the classpath, and to fallback to java.util.Logging.

Please see the Java SDK logging documentation for details.

Further Reading