Distributed Transactions from the Java SDK

    +
    A practical guide to using Couchbase’s distributed ACID transactions, via the Java API.

    This document presents a practical HOWTO on using Couchbase transactions, following on from our transactions documentation.

    Couchbase transactions was previously distributed as a separate library, before being integrated into the SDK from version 3.3.0. Users of the transactions library are recommended to follow the Distributed Transactions Migration Guide.

    Requirements

    • Couchbase Server 6.6.1 or above.

    • NTP should be configured so nodes of the Couchbase cluster are in sync with time.

    • The application, if it is using extended attributes (XATTRs), must avoid using the XATTR field txn, which is reserved for Couchbase use.

    If using a single node cluster (for example, during development), then note that the default number of replicas for a newly created bucket is 1. If left at this default, then all Key-Value writes performed at with durability will fail with a DurabilityImpossibleException. In turn this will cause all transactions (which perform all Key-Value writes durably) to fail. This setting can be changed via GUI or command line. If the bucket already existed, then the server needs to be rebalanced for the setting to take affect.

    Couchbase transactions require no additional components or services to be configured.

    Getting Started

    To make it easy to use any of the following examples, here are all the imports used by them:

    
    import com.couchbase.client.core.error.CouchbaseException;
    import com.couchbase.client.core.error.DocumentNotFoundException;
    import com.couchbase.client.core.msg.kv.DurabilityLevel;
    import com.couchbase.client.core.transactions.events.IllegalDocumentStateEvent;
    import com.couchbase.client.core.transactions.events.TransactionCleanupAttemptEvent;
    import com.couchbase.client.core.transactions.events.TransactionCleanupEndRunEvent;
    import com.couchbase.client.core.transactions.events.TransactionEvent;
    import com.couchbase.client.core.transactions.log.CoreTransactionLogMessage;
    import com.couchbase.client.java.Bucket;
    import com.couchbase.client.java.Cluster;
    import com.couchbase.client.java.ClusterOptions;
    import com.couchbase.client.java.Collection;
    import com.couchbase.client.java.ReactiveScope;
    import com.couchbase.client.java.Scope;
    import com.couchbase.client.java.env.ClusterEnvironment;
    import com.couchbase.client.java.json.JsonArray;
    import com.couchbase.client.java.json.JsonObject;
    import com.couchbase.client.java.query.QueryOptions;
    import com.couchbase.client.java.query.QueryProfile;
    import com.couchbase.client.java.query.QueryScanConsistency;
    import com.couchbase.client.java.transactions.TransactionKeyspace;
    import com.couchbase.client.java.transactions.TransactionQueryOptions;
    import com.couchbase.client.java.transactions.TransactionQueryResult;
    import com.couchbase.client.java.transactions.TransactionResult;
    import com.couchbase.client.java.transactions.config.TransactionsCleanupConfig;
    import com.couchbase.client.java.transactions.config.TransactionsConfig;
    import com.couchbase.client.java.transactions.config.TransactionsQueryConfig;
    import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException;
    import com.couchbase.client.java.transactions.error.TransactionFailedException;
    import com.couchbase.client.java.transactions.log.TransactionLogMessage;
    import com.couchbase.client.tracing.opentelemetry.OpenTelemetryRequestSpan;
    import io.opentelemetry.api.trace.Span;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.List;
    import java.util.logging.Logger;
    
    import static com.couchbase.client.java.transactions.config.SingleQueryTransactionOptions.singleQueryTransactionOptions;
    import static com.couchbase.client.java.transactions.config.TransactionOptions.transactionOptions;

    Transactions are accessed via a Cluster. A simple transaction that inserts one doc and updates the content of another looks like this (with error handling removed for brevity):

    cluster.transactions().run((ctx) -> {
        ctx.insert(collection, "doc1", doc1Content);
    
        var doc2 = ctx.get(collection, "doc2");
        ctx.replace(doc2, doc2Content);
    });

    Configuration

    The default configuration should be appropriate for most use-cases. If needed, transactions can be globally configured at the point of creating the Cluster. As usual with Cluster configuration, you can either explicitly create and manage the ClusterEnvironment yourself:

    var env = ClusterEnvironment.builder()
            .transactionsConfig(TransactionsConfig.durabilityLevel(DurabilityLevel.PERSIST_TO_MAJORITY)
                    .cleanupConfig(TransactionsCleanupConfig.cleanupWindow(Duration.ofSeconds(30)))
                    .queryConfig(TransactionsQueryConfig.scanConsistency(QueryScanConsistency.NOT_BOUNDED)))
            .build();
    
    var cluster = Cluster.connect("localhost", ClusterOptions.clusterOptions("username", "password")
            .environment(env));
    
    // Use the cluster
    // ...
    
    // Shutdown
    cluster.disconnect();
    env.shutdown();

    or alternatively use this lambda syntax, which leads to the ClusterEnvironment being owned and managed by the Cluster:

    var cluster = Cluster.connect("localhost", ClusterOptions.clusterOptions("username", "password")
            .environment(env -> env.transactionsConfig(TransactionsConfig
                    .durabilityLevel(DurabilityLevel.PERSIST_TO_MAJORITY)
                    .cleanupConfig(TransactionsCleanupConfig
                            .cleanupWindow(Duration.ofSeconds(30)))
                    .queryConfig(TransactionsQueryConfig
                            .scanConsistency(QueryScanConsistency.NOT_BOUNDED)))));
    // Use the cluster
    // ...
    
    // Shutdown
    cluster.disconnect();

    The default configuration will perform all writes with the durability setting Majority, ensuring that each write is available in-memory on the majority of replicas before the transaction continues. There are two higher durability settings available that will additionally wait for all mutations to be written to physical storage on either the active or the majority of replicas, before continuing. This further increases safety, at a cost of additional latency.

    A level of None is present but its use is discouraged and unsupported. If durability is set to None, then ACID semantics are not guaranteed.

    Creating a Transaction

    A core idea of Couchbase transactions is that an application supplies the logic for the transaction inside a lambda, including any conditional logic required, and the transaction is then automatically committed. If a transient error occurs, such as a temporary conflict with another transaction, then the transaction will rollback what has been done so far and run the lambda again. The application does not have to do these retries and error handling itself.

    Each run of the lambda is called an attempt, inside an overall transaction.

    You can create transactions in either synchronous mode:

    try {
        cluster.transactions().run((ctx) -> {
            // 'ctx' is a TransactionAttemptContext, which permits getting, inserting,
            // removing and replacing documents, performing N1QL queries, and committing or
            // rolling back the transaction.
    
            // ... Your transaction logic here ...
    
            // If the lambda succeeds, the transaction is automatically committed.
        });
    } catch (TransactionCommitAmbiguousException e) {
        throw logCommitAmbiguousError(e);
    } catch (TransactionFailedException e) {
        throw logFailure(e);
    }

    or asynchronous modes, using the Project Reactor reactive library:

    Mono<TransactionResult> result = cluster.reactive().transactions().run((ctx) -> {
        // 'ctx' is a TransactionAttemptContextReactive, providing asynchronous versions of the
        // TransactionAttemptContext methods.
    
        // Your transaction logic here: as an example, get and remove a doc
        return ctx.get(collection.reactive(), "document-id")
    
                .flatMap(doc -> ctx.remove(doc));
    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguousException) {
            throw logCommitAmbiguousError((TransactionCommitAmbiguousException) err);
        } else if (err instanceof TransactionFailedException) {
            throw logFailure((TransactionFailedException) err);
        }
    });
    
    // Normally you will chain this result further and ultimately subscribe. For
    // simplicity, here we just block on the result.
    TransactionResult finalResult = result.block();

    The synchronous mode is the easiest to write and understand. The asynchronous API allows you to build your application in a reactive style, which can help you scale with excellent efficiency. Those new to reactive programming may want to check out the Project Reactor site for more details on this powerful paradigm.

    The lambda gets passed a TransactionAttemptContext object, generally referred to as ctx here.

    Since the lambda may be rerun multiple times, it is important that it does not contain any side effects. In particular, you should never perform regular operations on a Collection, such as collection.insert(), inside the lambda. Such operations may be performed multiple times, and will not be performed transactionally. Instead such operations must be done through the ctx object, e.g. ctx.insert().

    The following helper logging methods will be used throughout the examples. Transaction logging is too verbose to be sent to the Java SDK event-bus, so instead it is stored in an in-memory log that can be accessed both from TransactionResult and TransactionFailedException. It is a good practice for the application to log any failed transactions into its own logging system, here represented with a logger object:

    public static RuntimeException logCommitAmbiguousError(TransactionCommitAmbiguousException err) {
        // This example will intentionally not compile: the application needs to use
        // its own logging system to replace `logger`.
        logger.warning("Transaction possibly reached the commit point");
    
        for (TransactionLogMessage msg : err.logs()) {
            logger.warning(msg.toString());
        }
    
        return err;
    }
    
    public static RuntimeException logFailure(TransactionFailedException err) {
        logger.warning("Transaction did not reach commit point");
    
        for (TransactionLogMessage msg : err.logs()) {
            logger.warning(msg.toString());
        }
    
        return err;
    }

    See the Logging documentation for more detail.

    Examples

    A code example is worth a thousand words, so here is a quick summary of the main transaction operations. They are described in more detail below.

    With the synchronous API
    Scope inventory = cluster.bucket("travel-sample").scope("inventory");
    
    try {
        var result = cluster.transactions().run((ctx) -> {
            // Inserting a doc:
            ctx.insert(collection, "doc-a", JsonObject.create());
    
            // Getting documents:
            var docA = ctx.get(collection, "doc-a");
    
            // Replacing a doc:
            var docB = ctx.get(collection, "doc-b");
            var content = docB.contentAs(JsonObject.class);
            content.put("transactions", "are awesome");
            ctx.replace(docB, content);
    
            // Removing a doc:
            var docC = ctx.get(collection, "doc-c");
            ctx.remove(docC);
    
            // Performing a SELECT N1QL query against a scope:
            var qr = ctx.query(inventory, "SELECT * FROM hotel WHERE country = $1",
                    TransactionQueryOptions.queryOptions()
                            .parameters(JsonArray.from("United Kingdom")));
            var rows = qr.rowsAs(JsonObject.class);
    
            // Performing an UPDATE N1QL query on multiple documents, in the `inventory` scope:
            ctx.query(inventory, "UPDATE route SET airlineid = $1 WHERE airline = $2",
                    TransactionQueryOptions.queryOptions()
                            .parameters(JsonArray.from("airline_137", "AF")));
        });
    } catch (TransactionCommitAmbiguousException e) {
        throw logCommitAmbiguousError(e);
    } catch (TransactionFailedException e) {
        throw logFailure(e);
    }
    With the asynchronous API
    ReactiveScope inventory = cluster.bucket("travel-sample").scope("inventory").reactive();
    
    Mono<TransactionResult> result = cluster.reactive().transactions().run((ctx) -> {
        // Inserting a doc:
        return ctx.insert(collection.reactive(), "doc-a", JsonObject.create())
    
                // Getting and replacing a doc:
                .then(ctx.get(collection.reactive(), "doc-b")).flatMap(docB -> {
                    var content = docB.contentAs(JsonObject.class);
                    content.put("transactions", "are awesome");
                    return ctx.replace(docB, content);
                })
    
                // Getting and removing a doc:
                .then(ctx.get(collection.reactive(), "doc-c"))
                    .flatMap(doc -> ctx.remove(doc))
    
                // Performing a SELECT N1QL query, in the `inventory` scope:
                .then(ctx.query(inventory, "SELECT * FROM hotel WHERE country = $1",
                        TransactionQueryOptions.queryOptions()
                                .parameters(JsonArray.from("United Kingdom"))))
    
                .flatMap(queryResult -> {
                    var rows = queryResult.rowsAs(JsonObject.class);
                    // The application would do something with the rows here.
                    return Mono.empty();
                })
    
                // Performing an UPDATE N1QL query on multiple documents, in the `inventory` scope:
                .then(ctx.query(inventory, "UPDATE route SET airlineid = $1 WHERE airline = $2",
                        TransactionQueryOptions.queryOptions()
                                .parameters(JsonArray.from("airline_137", "AF"))));
    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguousException) {
            throw logCommitAmbiguousError((TransactionCommitAmbiguousException) err);
        } else if (err instanceof TransactionFailedException) {
            throw logFailure((TransactionFailedException) err);
        }
    });
    
    // Normally you will chain this result further and ultimately subscribe.
    // For simplicity, here we just block on the result.
    result.block();

    Transaction Mechanics

    While this document is focussed on presenting how transactions are used at the API level, it is useful to have a high-level understanding of the mechanics. Reading this section is completely optional.

    Recall that the application-provided lambda (containing the transaction logic) may be run multiple times by Couchbase transactions. Each such run is called an attempt inside the overall transaction.

    Active Transaction Record Entries

    The first mechanic is that each of these attempts adds an entry to a metadata document in the Couchbase cluster. These metadata documents:

    • Are named Active Transaction Records, or ATRs.

    • Are created and maintained automatically.

    • Begin with "_txn:atr-".

    • Each contain entries for multiple attempts.

    • Are viewable, and they should not be modified externally.

    Each such ATR entry stores some metadata and, crucially, whether the attempt has committed or not. In this way, the entry acts as the single point of truth for the transaction, which is essential for providing an 'atomic commit' during reads.

    Staged Mutations

    The second mechanic is that mutating a document inside a transaction, does not directly change the body of the document. Instead, the post-transaction version of the document is staged alongside the document (technically in its extended attributes (XATTRs)). In this way, all changes are invisible to all parts of the Couchbase Data Platform until the commit point is reached.

    These staged document changes effectively act as a lock against other transactions trying to modify the document, preventing write-write conflicts.

    Cleanup

    There are safety mechanisms to ensure that leftover staged changes from a failed transaction cannot block live transactions indefinitely. These include an asynchronous cleanup process that is started with the first transaction, and scans for expired transactions created by any application, on the relevant collections.

    Note that if an application is not running, then this cleanup is also not running.

    The cleanup process is detailed below in Asynchronous Cleanup.

    Committing

    Only once the lambda has successfully run to conclusion, will the attempt be committed. This updates the ATR entry, which is used as a signal by transactional actors to use the post-transaction version of a document from its XATTRs. Hence, updating the ATR entry is an 'atomic commit' switch for the transaction.

    After this commit point is reached, the individual documents will be committed (or "unstaged"). This provides an eventually consistent commit for non-transactional actors.

    Key-Value Mutations

    Make sure to use the transactional Key-Value operations inside the lambda, such as ctx.insert(), rather than collection.insert(). Those would be executed as regular non-transactional Key-Value operations, and since the lambda can be executed multiple times, so could those operations.

    Replacing

    Replacing a document requires a ctx.get() call first. This is necessary so that the SDK can check that the document is not involved in another transaction, and take appropriate action if so.

    With the synchronous API:
    cluster.transactions().run((ctx) -> {
        var doc = ctx.get(collection, "doc-id");
        var content = doc.contentAs(JsonObject.class);
        content.put("transactions", "are awesome");
        ctx.replace(doc, content);
    });
    Asynchronous API:
    cluster.reactive().transactions().run((ctx) -> {
        return ctx.get(collection.reactive(), "doc-id").flatMap(doc -> {
            var content = doc.contentAs(JsonObject.class);
            content.put("transactions", "are awesome");
            return ctx.replace(doc, content);
        });
    });

    Removing

    As with replaces, removing a document requires a ctx.get() call first.

    Synchronous API:
    cluster.transactions().run((ctx) -> {
        var doc = ctx.get(collection, "doc-id");
        ctx.remove(doc);
    });
    With the asynchronous API:
    cluster.reactive().transactions().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> ctx.remove(doc));
    });
    For those using the asynchronous API - some ctx methods, notably ctx.remove(), return Mono<Void>. There is a common 'gotcha' with Mono<Void> in that it does not trigger a 'next' reactive event - only a 'completion' event. This means that some reactive operators chained afterwards - including the common flatMap - will not trigger. Generally, you will need to do ctx.remove(…​).then(…​) rather than ctx.remove(…​).flatMap(…​).

    Inserting

    With the asynchronous API:
    cluster.transactions().run((ctx) -> {
        String docId = "docId";
    
        ctx.insert(collection, docId, JsonObject.create());
    });
    With the synchronous API:
    cluster.reactive().transactions().run((ctx) -> {
        return ctx.insert(collection.reactive(), "docId", JsonObject.create());
    }).block();

    Reads

    cluster.transactions().run((ctx) -> {
        var doc = ctx.get(collection, "a-doc");
    });

    If the application needs to ignore or take action on a document not existing, it can catch the exception:

    cluster.transactions().run((ctx) -> {
        try {
            var doc = ctx.get(collection, "a-doc");
        }
        catch (DocumentNotFoundException err) {
            // The application can continue the transaction here if needed, or take alternative action
        }
    });

    Gets will 'read your own writes', e.g. this will succeed:

    cluster.transactions().run((ctx) -> {
        String docId = "docId";
    
        ctx.insert(collection, docId, JsonObject.create());
    
        var doc = ctx.get(collection, docId);
    });

    Of course, no other transaction will be able to read that inserted document, until this transaction reaches the commit point.

    N1QL Queries

    As of Couchbase Server 7.0, N1QL queries may be used inside the transaction lambda, freely mixed with Key-Value operations.

    BEGIN TRANSACTION

    There are two ways to initiate a transaction with Couchbase 7.x: via the SDK, and via the query service directly using BEGIN TRANSACTION. The latter is intended for those using query via the REST API, or using the query workbench in the UI, and it is strongly recommended that application writers instead use the SDK. This provides these benefits:

    • It automatically handles errors and retrying.

    • It allows Key-Value operations and N1QL queries to be freely mixed.

    • It takes care of issuing BEGIN TRANSACTION, END TRANSACTION, COMMIT and ROLLBACK automatically. These become an implementation detail and you should not use these statements inside the lambda.

    Supported N1QL

    The majority of N1QL DML statements are permitted within a transaction. Specifically: INSERT, UPSERT, DELETE, UPDATE, MERGE and SELECT are supported.

    DDL statements, such as CREATE INDEX, are not.

    Using N1QL

    If you already use N1QL, then its use in transactions is very similar. A query returns a TransactionQueryResult that is very similar to the QueryResult you are used to, and a query takes a TransactionQueryOptions that provides the majority of the same options as QueryOptions.

    It’s important to use the transactional ctx.query() inside the lambda, rather than cluster.query() or scope.query(). Those would be executed as regular non-transactional queries, and since the lambda can be executed multiple times, so could the statements.

    An example of selecting some rows from the travel-sample bucket:

    cluster.transactions().run((ctx) -> {
        var st = "SELECT * FROM `travel-sample`.inventory.hotel WHERE country = $1";
        var qr = ctx.query(st, TransactionQueryOptions.queryOptions()
                .parameters(JsonArray.from("United Kingdom")));
        var rows = qr.rowsAs(JsonObject.class);
    });

    Rather than specifying the full "`travel-sample`.inventory.hotel" name each time, it is easier to pass a reference to the inventory Scope:

    Bucket travelSample = cluster.bucket("travel-sample");
    Scope inventory = travelSample.scope("inventory");
    
    cluster.transactions().run((ctx) -> {
        var qr = ctx.query(inventory, "SELECT * FROM hotel WHERE country = $1",
                TransactionQueryOptions.queryOptions()
                        .parameters(JsonArray.from("United States")));
        var rows = qr.rowsAs(JsonObject.class);
    });

    An example using a Scope for an UPDATE:

    String hotelChain = "http://marriot%";
    String country = "United States";
    
    cluster.transactions().run((ctx) -> {
        var qr = ctx.query(inventory, "UPDATE hotel SET price = $1 WHERE url LIKE $2 AND country = $3",
                TransactionQueryOptions.queryOptions()
                        .parameters(JsonArray.from(99.99, hotelChain, country)));
        assert(qr.metaData().metrics().get().mutationCount() == 1);
    });

    And an example combining SELECTs and UPDATEs. It’s possible to call regular Java methods from the lambda, as shown here, permitting complex logic to be performed. Just remember that since the lambda may be called multiple times, so may the method.

    cluster.transactions().run((ctx) -> {
        // Find all hotels of the chain
        var qr = ctx.query(inventory, "SELECT reviews FROM hotel WHERE url LIKE $1 AND country = $2",
                TransactionQueryOptions.queryOptions()
                        .parameters(JsonArray.from(hotelChain, country)));
    
        // This function (not provided here) will use a trained machine learning model to provide a
        // suitable price based on recent customer reviews.
        double updatedPrice = priceFromRecentReviews(qr);
    
        // Set the price of all hotels in the chain
        ctx.query(inventory, "UPDATE hotel SET price = $1 WHERE url LIKE $2 AND country = $3",
                TransactionQueryOptions.queryOptions()
                        .parameters(JsonArray.from(updatedPrice, hotelChain, country)));
    });

    Read Your Own Writes

    As with Key-Value operations, N1QL queries support Read Your Own Writes.

    This example shows inserting a document and then selecting it again.

    cluster.transactions().run((ctx) -> {
        ctx.query("INSERT INTO `default` VALUES ('doc', {'hello':'world'})");  (1)
    
        // Performing a 'Read Your Own Write'
        var st = "SELECT `default`.* FROM `default` WHERE META().id = 'doc'"; (2)
        var qr = ctx.query(st);
        assert(qr.metaData().metrics().get().resultCount() == 1);
    });
    1 The inserted document is only staged at this point. as the transaction has not yet committed. Other transactions, and other non-transactional actors, will not be able to see this staged insert yet.
    2 But the SELECT can, as we are reading a mutation staged inside the same transaction.

    Mixing Key-Value and N1QL

    Key-Value operations and queries can be freely intermixed, and will interact with each other as you would expect.

    In this example we insert a document with Key-Value, and read it with a SELECT.

    cluster.transactions().run((ctx) -> {
        ctx.insert(collection, "doc", JsonObject.create().put("hello", "world")); (1)
    
        // Performing a 'Read Your Own Write'
        var st = "SELECT `default`.* FROM `default` WHERE META().id = 'doc'"; (2)
        var qr = ctx.query(st);
        assert(qr.metaData().metrics().get().resultCount() == 1);
    });
    1 As with the 'Read Your Own Writes' example, here the insert is only staged, and so it is not visible to other transactions or non-transactional actors.
    2 But the SELECT can view it, as the insert was in the same transaction.

    Query Options

    Query options can be provided via TransactionQueryOptions, which provides a subset of the options in the Java SDK’s QueryOptions.

    cluster.transactions().run((ctx) -> {
        ctx.query("INSERT INTO `default` VALUES ('doc', {'hello':'world'})",
                TransactionQueryOptions.queryOptions().profile(QueryProfile.TIMINGS));
    });

    The supported options are:

    • parameters

    • scanConsistency

    • flexIndex

    • serializer

    • clientContextId

    • scanWait

    • scanCap

    • pipelineBatch

    • pipelineCap

    • profile

    • readonly

    • adhoc

    • raw

    See the QueryOptions documentation for details on these.

    Query Concurrency

    Only one query statement will be performed by the query service at a time. Non-blocking mechanisms can be used to perform multiple concurrent query statements, but this may result internally in some added network traffic due to retries, and is unlikely to provide any increased performance.

    Query Performance Advice

    This section is optional reading, and only for those looking to maximize transactions performance.

    After the first query statement in a transaction, subsequent Key-Value operations in the lambda are converted into N1QL and executed by the query service rather than the Key-Value data service. The operation will behave identically, and this implementation detail can largely be ignored, except for these two caveats:

    • These converted Key-Value operations are likely to be slightly slower, as the query service is optimized for statements involving multiple documents. Those looking for the maximum possible performance are recommended to put Key-Value operations before the first query in the lambda, if possible.

    • Those using non-blocking mechanisms to achieve concurrency should be aware that the converted Key-Value operations are subject to the same parallelism restrictions mentioned above, e.g. they will not be executed in parallel by the query service.

    Single Query Transactions

    This section is mainly of use for those wanting to do large, bulk-loading transactions.

    The query service maintains where required some in-memory state for each document in a transaction, that is freed on commit or rollback. For most use-cases this presents no issue, but there are some workloads, such as bulk loading many documents, where this could exceed the server resources allocated to the service. Solutions to this include breaking the workload up into smaller batches, and allocating additional memory to the query service. Alternatively, single query transaction, described here, may be used.

    Single query transactions have these characteristics:

    • They have greatly reduced memory usage inside the query service.

    • As the name suggests, they consist of exactly one query, and no Key-Value operations.

    You will see reference elsewhere in Couchbase documentation to the tximplicit query parameter. Single query transactions internally are setting this parameter. In addition, they provide automatic error and retry handling.

    Single query transactions may be initiated like so:

    try {
        var result = cluster.query(bulkLoadStatement, QueryOptions.queryOptions().asTransaction());
    } catch (TransactionCommitAmbiguousException e) {
        throw logCommitAmbiguousError(e);
    } catch (TransactionFailedException e) {
        throw logFailure(e);
    } catch (CouchbaseException e) {
        // Any standard query errors can be raised here too, such as ParsingFailureException.  In these cases the
        // transaction definitely did not reach commit point.
        logger.warning("Transaction did not reach commit point");
        throw e;
    }

    You can also run a single query transaction against a particular Scope (these examples will exclude the full error handling for brevity):

    Bucket travelSample = cluster.bucket("travel-sample");
    Scope inventory = travelSample.scope("inventory");
    
    inventory.query(bulkLoadStatement, QueryOptions.queryOptions().asTransaction());

    and configure it:

    cluster.query(bulkLoadStatement, QueryOptions.queryOptions()
            // Single query transactions will often want to increase the default timeout
            .timeout(Duration.ofSeconds(360))
            .asTransaction(singleQueryTransactionOptions()
                    .durabilityLevel(DurabilityLevel.PERSIST_TO_MAJORITY)));

    Query with KV Roles

    To execute a key-value operation within a transaction, users must have the relevant Administrative or Data RBAC roles, and permissions on the relevant buckets, scopes, and collections.

    Similarly, to run a query statement within a transaction, users must have the relevant Administrative or Query & Index RBAC roles, and permissions on the relevant buckets, scopes and collections.

    Refer to Roles for details.

    Query Mode
    When a transaction executes a query statement, the transaction enters query mode, which means that the query is executed with the user’s query permissions. Any key-value operations which are executed by the transaction after the query statement are also executed with the user’s query permissions. These may or may not be different to the user’s data permissions; if they are different, you may get unexpected results.

    Committing

    Committing is automatic: at the end of the lambda, if all operations have succeeded and no exception is thrown, committing will commence.

    Committing starts with the atomic step of switching the transaction’s Active Transaction Record entry to Committed. As soon as this is done, all its changes will be atomically visible to reads from other transactions - this is Read Atomicity. The individual documents will then be committed so they are visible to non-transactional actors, in an eventually consistent fashion.

    Commit is final: after the atomic Committed step is reached, the transaction cannot be rolled back, and no further operations are allowed on it.

    An asynchronous cleanup process ensures that once the transaction reaches the commit point, it will be fully committed - even if the application crashes.

    A Full Transaction Example

    Let’s pull together everything so far into a more real-world example of a transaction.

    This example simulates a simple Massively Multiplayer Online game, and includes documents representing:

    • Players, with experience points and levels;

    • Monsters, with hitpoints, and the number of experience points a player earns from their death.

    In this example, the player is dealing damage to the monster. The player’s client has sent this instruction to a central server, where we’re going to record that action. We’re going to do this in a transaction, as we don’t want a situation where the monster is killed, but we fail to update the player’s document with the earned experience.

    (Though this is just a demo - in reality, the game would likely live with the small risk and limited impact of this, rather than pay the performance cost for using a transaction.)

    public void playerHitsMonster(int damage, String playerId, String monsterId) {
        try {
            cluster.transactions().run((ctx) -> {
                var monsterDoc = ctx.get(collection, monsterId);
                var playerDoc = ctx.get(collection, playerId);
    
                int monsterHitpoints = monsterDoc.contentAs(JsonObject.class).getInt("hitpoints");
                int monsterNewHitpoints = monsterHitpoints - damage;
    
                if (monsterNewHitpoints <= 0) {
                    // Monster is killed. The remove is just for demoing, and a more realistic
                    // example would set a "dead" flag or similar.
                    ctx.remove(monsterDoc);
    
                    // The player earns experience for killing the monster
                    int experienceForKillingMonster = monsterDoc.contentAs(JsonObject.class)
                            .getInt("experienceWhenKilled");
                    int playerExperience = playerDoc.contentAs(JsonObject.class).getInt("experience");
                    int playerNewExperience = playerExperience + experienceForKillingMonster;
                    int playerNewLevel = calculateLevelForExperience(playerNewExperience);
    
                    var playerContent = playerDoc.contentAs(JsonObject.class);
    
                    playerContent.put("experience", playerNewExperience);
                    playerContent.put("level", playerNewLevel);
    
                    ctx.replace(playerDoc, playerContent);
                } else {
                    // Monster is damaged but still alive
                    var monsterContent = monsterDoc.contentAs(JsonObject.class);
                    monsterContent.put("hitpoints", monsterNewHitpoints);
    
                    ctx.replace(monsterDoc, monsterContent);
                }
            });
        } catch (TransactionCommitAmbiguousException e) {
            throw logCommitAmbiguousError(e);
        } catch (TransactionFailedException e) {
            throw logFailure(e);
        }
    }

    Concurrency with Non-Transactional Writes

    This release of transactions for Couchbase requires a degree of co-operation from the application. Specifically, the application should ensure that non-transactional writes are never done concurrently with transactional writes, on the same document.

    This requirement is to ensure that the strong Key-Value performance of Couchbase was not compromised. A key philosophy of our transactions is that you 'pay only for what you use'.

    If two such writes do conflict then the behaviour is undefined: either write may 'win', overwriting the other. This still applies if the non-transactional write is using CAS.

    Note this only applies to writes. Any non-transactional reads concurrent with transactions are fine, and are at a Read Committed level.

    To help detect that this co-operative requirement is fulfilled, the application can subscribe to the client’s event logger and check for any IllegalDocumentStateEvent events, like so:

    cluster.environment().eventBus().subscribe(event -> {
        if (event instanceof IllegalDocumentStateEvent) {
            // log this event for review
            log(((TransactionEvent) event).logs());
        }
    });

    These events will be raised in the event of a non-transactional write being detected and overridden. The event contains the key of the document involved, to aid the application with debugging.

    Rollback

    If an exception is thrown, either by the application from the lambda, or by the transactions logic itself (e.g. on a failed operation), then that attempt is rolled back.

    The application’s lambda may or may not be retried, depending on the error that occurred. The general rule for retrying is whether the transaction is likely to succeed on a retry. For example, if this transaction is trying to write a document that is currently involved in another transaction (a write-write conflict), this will lead to a retry as that is likely a transient state. But if the transaction is trying to get a document that does not exist, it will not retry.

    If the transaction is not retried then it will throw a TransactionFailedException, and its getCause method can be used for more details on the failure.

    The application can use this to signal why it triggered a rollback, as so:

    class BalanceInsufficient extends RuntimeException {
    }
    
    try {
        cluster.transactions().run((ctx) -> {
            var customer = ctx.get(collection, "customer-name");
    
            if (customer.contentAsObject().getInt("balance") < costOfItem) {
                throw new BalanceInsufficient();
            }
            // else continue transaction
        });
    } catch (TransactionCommitAmbiguousException e) {
        // This exception can only be thrown at the commit point, after the
        // BalanceInsufficient logic has been passed, so there is no need to
        // check getCause here.
        throw logCommitAmbiguousError(e);
    } catch (TransactionFailedException e) {
        if (e.getCause() instanceof BalanceInsufficient) {
            // Re-raise the error
            throw (RuntimeException) e.getCause();
        } else {
            throw logFailure(e);
        }
    }

    After a transaction is rolled back, it cannot be committed, no further operations are allowed on it, and the SDK will not try to automatically commit it at the end of the code block.

    Error Handling

    As discussed previously, Couchbase transactions will attempt to resolve many errors for you, through a combination of retrying individual operations and the application’s lambda. This includes some transient server errors, and conflicts with other transactions.

    But there are situations that cannot be resolved, and total failure is indicated to the application via errors. These situations include:

    • Any error thrown by your transaction lambda, either deliberately or through an application logic bug.

    • Attempting to insert a document that already exists.

    • Calling ctx.get() on a document key that does not exist (if the resultant exception is not caught).

    Once one of these errors occurs, the current attempt is irrevocably failed (though the transaction may retry the lambda to make a new attempt). It is not possible for the application to catch the failure and continue (with the exception of ctx.get() raising an error). Once a failure has occurred, all other operations tried in this attempt (including commit) will instantly fail.

    Transactions, as they are multi-stage and multi-document, also have a concept of partial success or failure. This is signalled to the application through the TransactionResult.unstagingComplete() method, described later.

    There are three exceptions that Couchbase transactions can raise to the application: TransactionFailedException, TransactionExpiredException and TransactionCommitAmbiguousException. All exceptions derive from TransactionFailed for backwards-compatibility purposes.

    TransactionFailedException and TransactionExpiredException

    The transaction definitely did not reach the commit point. TransactionFailedException indicates a fast-failure whereas TransactionExpiredException indicates that retries were made until the timeout was reached, but this distinction is not normally important to the application and generally TransactionExpiredException does not need to be handled individually.

    Either way, an attempt will have been made to rollback all changes. This attempt may or may not have been successful, but the results of this will have no impact on the protocol or other actors. No changes from the transaction will be visible, both to transactional and non-transactional actors.

    Handling: Generally, debugging exactly why a given transaction failed requires review of the logs, so it is suggested that the application log these on failure (see Logging). The application may want to try the transaction again later. Alternatively, if transaction completion time is not a priority, then transaction timeouts (which default to 15 seconds) can be extended across the board through TransactionsConfig.

    var cluster = Cluster.connect("localhost", ClusterOptions.clusterOptions("username", "password")
            .environment(env -> env.transactionsConfig(TransactionsConfig.timeout(Duration.ofSeconds(120)))));

    Alternatively it can be configured at the per-transaction level:

    cluster.transactions().run((ctx) -> {
        // Your transaction logic
    }, transactionOptions().timeout(Duration.ofSeconds(60)));

    This will allow the protocol more time to get past any transient failures (for example, those caused by a cluster rebalance). The tradeoff to consider with longer timeouts, is that documents that have been staged by a transaction are effectively locked from modification from other transactions, until the timeout has been reached.

    Note that the timeout is not guaranteed to be followed precisely. For example, if the application were to do a long blocking operation inside the lambda (which should be avoided), then timeout can only trigger after this finishes. Similarly, if the transaction attempts a key-value operation close to the timeout, and that key-value operation times out, then the transaction timeout may be exceeded.

    TransactionCommitAmbiguousException

    As discussed previously, each transaction has a 'single point of truth' that is updated atomically to reflect whether it is committed.

    However, it is not always possible for the protocol to become 100% certain that the operation was successful, before the transaction expires. This potential ambiguity is unavoidable in any distributed system; a classic example is a network failure happening just after an operation was sent from a client to a server. The client will not get a response back and cannot know if the server received and executed the operation.

    The ambiguity is particularly important at the point of the atomic commit, as the transaction may or may not have reached the commit point. Couchbase transactions will raise TransactionCommitAmbiguousException to indicate this state. It should be rare to receive this error.

    If the transaction had in fact successfully reached the commit point, then the transaction will be fully completed ("unstaged") by the asynchronous cleanup process at some point in the future. With default settings this will usually be within a minute, but whatever underlying fault has caused the TransactionCommitAmbiguousException may lead to it taking longer.

    If the transaction had not in fact reached the commit point, then the asynchronous cleanup process will instead attempt to roll it back at some point in the future.

    Handling: This error can be challenging for an application to handle. As with TransactionFailedException it is recommended that it at least writes any logs from the transaction, for future debugging. It may wish to retry the transaction at a later point, or extend transactional timeouts (as detailed above) to give the protocol additional time to resolve the ambiguity.

    TransactionResult.unstagingComplete()

    This boolean flag indicates whether all documents were able to be unstaged (committed).

    For most use-cases it is not an issue if it is false. All transactional actors will still read all the changes from this transaction, as though it had committed fully. The cleanup process is asynchronously working to complete the commit, so that it will be fully visible to non-transactional actors.

    The flag is provided for those rare use-cases where the application requires the commit to be fully visible to non-transactional actors, before it may continue. In this situation the application can raise an error here, or poll all documents involved until they reflect the mutations.

    If you regularly see this flag false, consider increasing the transaction timeout to reduce the possibility that the transaction times out during the commit.

    Full Error Handling Example

    Pulling all of the above together, this is the suggested best practice for error handling:

    try {
        var result = cluster.transactions().run((ctx) -> {
            // ... transactional code here ...
        });
    
        // The transaction definitely reached the commit point. Unstaging
        // the individual documents may or may not have completed
    
        if (!result.unstagingComplete()) {
            // In rare cases, the application may require the commit to have
            // completed.  (Recall that the asynchronous cleanup process is
            // still working to complete the commit.)
            // The next step is application-dependent.
        }
    } catch (TransactionCommitAmbiguousException e) {
        throw logCommitAmbiguousError(e);
    } catch (TransactionFailedException e) {
        throw logFailure(e);
    }

    Asynchronous Cleanup

    Transactions will try to clean up after themselves in the advent of failures. However, there are situations that inevitably created failed, or 'lost' transactions, such as an application crash.

    This requires an asynchronous cleanup task, described in this section.

    The first transaction triggered by an application will spawn a background cleanup task, whose job it is to periodically scan for expired transactions and clean them up. It does this by scanning a subset of the Active Transaction Record (ATR) transaction metadata documents, for each collection used by any transactions.

    The default settings are tuned to find expired transactions reasonably quickly, while creating negligible impact from the background reads required by the scanning process. To be exact, with default settings it will generally find expired transactions within 60 seconds, and use less than 20 reads per second, per collection of metadata documents being checked. This is unlikely to impact performance on any cluster, but the settings may be tuned as desired.

    All applications connected to the same cluster and running transactions will share in the cleanup, via a low-touch communication protocol on the "_txn:client-record" metadata document that will be created in each collection in the cluster involved with transaction metadata. This document is visible and should not be modified externally as it is maintained automatically. All ATRs will be distributed between all cleanup clients, so increasing the number of applications will not increase the reads required for scanning.

    An application may cleanup transactions created by another application.

    It is important to understand that if an application is not running, then cleanup is not running. This is particularly relevant to developers running unit tests or similar.

    Configuring Cleanup

    The cleanup settings can be configured as so:

    var cluster = Cluster.connect("localhost", ClusterOptions.clusterOptions("username", "password")
            .environment(env -> env.transactionsConfig(TransactionsConfig.cleanupConfig(TransactionsCleanupConfig
                    .cleanupClientAttempts(true)
                    .cleanupLostAttempts(true)
                    .cleanupWindow(Duration.ofSeconds(120))
                    .addCollections(List.of(keyspace))))));

    The settings supported by TransactionsCleanupConfig are:

    Setting Default Description

    cleanupWindow

    60 seconds

    This determines how long a cleanup 'run' is; that is, how frequently this client will check its subset of ATR documents. It is perfectly valid for the application to change this setting, which is at a conservative default. Decreasing this will cause expiration transactions to be found more swiftly (generally, within this cleanup window), with the tradeoff of increasing the number of reads per second used for the scanning process.

    cleanupLostAttempts

    true

    This is the thread that takes part in the distributed cleanup process described above, that cleans up expired transactions created by any client. It is strongly recommended that it is left enabled.

    cleanupClientAttempts

    true

    This thread is for cleaning up transactions created just by this client. The client will preferentially aim to send any transactions it creates to this thread, leaving transactions for the distributed cleanup process only when it is forced to (for example, on an application crash). It is strongly recommended that it is left enabled.

    addCollections

    empty

    Adds additional collections to the 'cleanup set' - the set of collections being cleaned up.

    Monitoring Cleanup

    If the application wishes to monitor cleanup it may subscribe to these events:

    cluster.environment().eventBus().subscribe(event -> {
        if (event instanceof TransactionCleanupAttemptEvent || event instanceof TransactionCleanupEndRunEvent) {
            log(((TransactionEvent) event).logs());
        }
    });

    TransactionCleanupEndRunEvent is raised whenever a current 'run' is finished, and contains statistics from the run. (A run is typically around every 60 seconds, with default configuration.)

    A TransactionCleanupAttemptEvent event is raised when an expired transaction was found by this process, and a cleanup attempt was made. It contains whether that attempt was successful, along with any logs relevant to the attempt.

    In addition, if cleanup fails to cleanup a transaction that is more than two hours past expiry, it will raise the TransactionCleanupAttemptEvent event at WARN level (rather than the default DEBUG). With most default configurations of the event-bus (see Logging below), this will cause that event to be logged somewhere visible to the application. If there is not a good reason for the cleanup to be failed (such as a downed node that has not yet been failed-over), then the user is encouraged to report the issue.

    Logging

    To aid troubleshooting, each transaction maintains a list of log entries, which can be logged on failure like this:

    } catch (TransactionCommitAmbiguousException e) {
        throw logCommitAmbiguousError(e);
    } catch (TransactionFailedException e) {
        throw logFailure(e);
    }

    or for the asynchronous API:

    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguousException) {
            throw logCommitAmbiguousError((TransactionCommitAmbiguousException) err);
        } else if (err instanceof TransactionFailedException) {
            throw logFailure((TransactionFailedException) err);
        }
    });

    Those convenience methods are:

    public static RuntimeException logCommitAmbiguousError(TransactionCommitAmbiguousException err) {
        // This example will intentionally not compile: the application needs to use
        // its own logging system to replace `logger`.
        logger.warning("Transaction possibly reached the commit point");
    
        for (TransactionLogMessage msg : err.logs()) {
            logger.warning(msg.toString());
        }
    
        return err;
    }
    
    public static RuntimeException logFailure(TransactionFailedException err) {
        logger.warning("Transaction did not reach commit point");
    
        for (TransactionLogMessage msg : err.logs()) {
            logger.warning(msg.toString());
        }
    
        return err;
    }

    Of course, the application should replace logger.warning with their own logging system.

    A failed transaction can involve dozens, even hundreds, of lines of logging, so the application may prefer to write failed transactions into a separate file.

    In addition a successful transaction can be logged:

    var result = cluster.transactions().run((ctx) -> {
        // Your transaction logic
    });
    
    result.logs().forEach(message -> logger.info(message.toString()));

    Tracing

    If configured, detailed telemetry on each transaction can be output that is compatible with various external systems including OpenTelemetry and its predecessor OpenTracing. This telemetry is particularly useful for monitoring performance.

    See the SDK Request Tracing documentation for how to configure this.

    The tracing should currently be regarded as 'developer preview' functionality, as the spans and attributes output may change over time.

    Parent Spans

    The application may wish to indicate that the transaction is part of a larger span — for instance, a user request. It can do this by passing that as a parent span.

    This can be done using the SDK’s RequestTracer abstraction as so:

    var span = cluster.environment().requestTracer().requestSpan("your-span-name", null);
    
    cluster.transactions().run((ctx) -> {
        // your transaction
    }, transactionOptions().parentSpan(span));

    Or if you have an existing OpenTelemetry span you can easily convert it to a Couchbase RequestSpan and pass it to the SDK:

    var span = Span.current(); // this is a span created by your code earlier
    var wrapped = OpenTelemetryRequestSpan.wrap(span);
    
    cluster.transactions().run((ctx) -> {
        // your transaction
    }, transactionOptions().parentSpan(wrapped));

    Concurrent Operations

    The reactive API allows operations to be performed concurrently inside a transaction, which can assist performance.

    An example of performing parallel operations using the reactive API:

    List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5");
    int concurrency = 100; // This many operations will be in-flight at once
    
    var result = cluster.reactive().transactions().run((ctx) -> {
        return Flux.fromIterable(docIds)
                .parallel(concurrency)
                .runOn(Schedulers.boundedElastic())
                .concatMap(docId -> ctx.get(collection.reactive(), docId)
                        .flatMap(doc -> {
                            var content = doc.contentAsObject();
                            content.put("value", "updated");
                            return ctx.replace(doc, content);
                        }))
                .sequential()
                .then();
    }).block();

    Custom Metadata Collections

    As described earlier, transactions automatically create and use metadata documents. By default, these are created in the default collection of the bucket of the first mutated document in the transaction. Optionally, you can instead specify a collection to store the metadata documents. Most users will not need to use this functionality, and can continue to use the default behavior. They are provided for these use-cases:

    • The metadata documents contain, for documents involved in each transaction, the document’s key and the name of the bucket, scope and collection it exists on. In some deployments this may be sensitive data.

    • You wish to remove the default collections. Before doing this, you should ensure that all existing transactions using metadata documents in the default collections have finished.

    Usage

    Custom metadata collections are enabled with:

    var keyspace = TransactionKeyspace.create("bucketName", "scopeName", "collectionName");
    
    var cluster = Cluster.connect("localhost", ClusterOptions.clusterOptions("username", "password")
            .environment(env -> env.transactionsConfig(TransactionsConfig.metadataCollection(keyspace))));

    or at an individual transaction level with:

    cluster.transactions().run((ctx) -> {
        // Your transaction logic
    }, transactionOptions().metadataCollection(collection));

    You need to ensure that this application has RBAC data read and write privileges to any custom metadata collections, and should not delete them subsequently as that can interfere with existing transactions. You can use existing collections or create new ones.

    Further Reading