Distributed Transactions from the Java SDK

    +
    A practical guide to using Couchbase’s distributed ACID transactions, via the Java API.

    Distributed Transactions for Couchbase provide these semantics and features:

    • Insertion, mutation, and removal of multiple documents can be staged inside a transaction.

    • Until the transaction is committed, these changes will not be visible to other transactions, or any other part of the Couchbase Data Platform.

    • An isolation level of Read Committed, to support high performance and scalability.

    • A high-level and easy-to-use API that allows the developer to express what they want the transaction to do as a block of logic, while the library takes care of the details of error handling, including conflicts with other transactions.

    Please see our introduction to ACID Transactions for a guide to the benefits and trade-offs of multi-document transactions.

    Below we show you how to create Transactions, step-by-step. You may also want to start with our transactions examples repository, which features useful downloadable examples of using Distributed Transactions.

    Javadocs are available online.

    Requirements

    • Couchbase Server 6.5 or above.

    • Couchbase Java client 3.0.0 or above. It is recommended to follow the transitive dependency for the transactions library from maven.

    • NTP should be configured so nodes of the Couchbase cluster are in sync with time.

    • The application, if it is using extended attributes (XATTRs), must avoid using the XATTR field txn, which is reserved for Couchbase use.

    If using a single node cluster (for example, during development), then note that the default number of replicas for a newly created bucket is 1. If left at this default, then all Key-Value writes performed at with durabiltiy will fail with a DurabilityImpossibleException. In turn this will cause all transactions (which perform all Key-Value writes durably) to fail. This setting can be changed via GUI or command line.

    Getting Started

    Couchbase transactions require no additional components or services to be configured. Simply add the transactions library into your project. With Gradle this can be accomplished by modifying these sections of your build.gradle file like so:

    dependencies {
        compile group: 'com.couchbase.client', name: 'couchbase-transactions', version: '1.0.1'
    }

    Use the same group, artifact, and version in your Maven-compatible tool of choice.

    A complete simple gradle project is available on our transactions examples repository.

    Initializing Transactions

    Here are all imports used by the following examples:

    
    import com.couchbase.client.java.Bucket;
    import com.couchbase.client.java.Cluster;
    import com.couchbase.client.java.Collection;
    import com.couchbase.client.java.ReactiveCollection;
    import com.couchbase.client.java.json.JsonObject;
    import com.couchbase.client.java.kv.GetResult;
    import com.couchbase.transactions.TransactionDurabilityLevel;
    import com.couchbase.transactions.TransactionGetResult;
    import com.couchbase.transactions.TransactionResult;
    import com.couchbase.transactions.Transactions;
    import com.couchbase.transactions.config.TransactionConfigBuilder;
    import com.couchbase.transactions.deferred.TransactionSerializedContext;
    import com.couchbase.transactions.error.TransactionCommitAmbiguous;
    import com.couchbase.transactions.error.TransactionFailed;
    import com.couchbase.transactions.log.IllegalDocumentState;
    import com.couchbase.transactions.log.LogDefer;
    import com.couchbase.transactions.log.TransactionCleanupAttempt;
    import com.couchbase.transactions.log.TransactionCleanupEndRunEvent;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Optional;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.logging.Logger;
    
    import static com.couchbase.client.java.query.QueryOptions.queryOptions;

    The starting point is the Transactions object. It is very important that the application ensures that only one of these is created, as it performs automated background processes that should not be duplicated.

    // Initialize the Couchbase cluster
    Cluster cluster = Cluster.connect("localhost", "transactor", "mypass");
    Bucket bucket = cluster.bucket("transact");
    Collection collection = bucket.defaultCollection();
    
    // Create the single Transactions object
    Transactions transactions = Transactions.create(cluster, TransactionConfigBuilder.create()
            // The configuration can be altered here, but in most cases
            // the defaults are fine.
            .build());

    Configuration

    Transactions can optionally be configured at the point of creating the Transactions object:

    Transactions transactions = Transactions.create(cluster, TransactionConfigBuilder.create()
            .durabilityLevel(TransactionDurabilityLevel.PERSIST_TO_MAJORITY)
            .build());

    The default configuration will perform all writes with the durability setting Majority, ensuring that each write is available in-memory on the majority of replicas before the transaction continues. There are two higher durability settings available that will additionally wait for all mutations to be written to physical storage on either the active or the majority of replicas, before continuing. This further increases safety, at a cost of additional latency.

    A level of None is present but its use is discouraged and unsupported. There are no atomicity guarantees if None is used.

    Creating a Transaction

    A core idea of the library is that the application supplies the logic for transaction inside a Java lambda, including any conditional logic required, and the transactions library takes care of getting the transaction committed.

    It is important to understand that the lambda may be run multiple times in order to handle some types of transient error, such as a temporary conflict with another transaction.

    Each run of the lambda is called an attempt, inside an overall transaction.

    As with the Couchbase Java Client, you can use the library in either synchronous mode (the exceptions will be explained later in Error Handling):

    try {
        transactions.run((ctx) -> {
            // 'ctx' is an AttemptContext, which permits getting, inserting,
            // removing and replacing documents, along with committing and
            // rolling back the transaction.
    
            // ... Your transaction logic here ...
    
            // This call is optional - if you leave it off, the transaction
            // will be committed anyway.
            ctx.commit();
        });
    } catch (TransactionCommitAmbiguous e) {
        // The application will of course want to use its own logging rather
        // than System.err
        System.err.println("Transaction possibly committed");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    or asynchronous modes, using the Project Reactor reactive library:

    Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
        // 'ctx' is an AttemptContextReactive, providing asynchronous versions of the AttemptContext methods
    
        return
    
                // Your transaction logic here: as an example, get and remove a doc
                ctx.get(collection.reactive(), "document-id")
                        .flatMap(doc -> ctx.remove(doc))
    
                        // The commit call is optional - if you leave it off,
                        // the transaction will be committed anyway.
                        .then(ctx.commit());
    
    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguous) {
            System.err.println("Transaction possibly committed: ");
        }
        else {
            System.err.println("Transaction failed: ");
        }
    
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // System.err is used for example, log failures to your own logging system
            System.err.println(err.toString());
        }
    });
    
    // Normally you will chain this result further and ultimately subscribe.  For simplicity, here we just block
    // on the result.
    TransactionResult finalResult = result.block();

    The synchronous mode is the easiest to write and understand. The asynchronous API allows you to build your application in a reactive style, without large thread pools, which can help you scale with excellent efficiency. Those new to reactive programming may want to check out the Project Reactor site for more details on this powerful paradigm.

    Some AttemptContextReactive methods, notably remove, return Mono<Void>. Be careful to use then rather than flatMap or similar on these, as Mono<Void> will only trigger a completion event, and not the next event, so many methods including flatMap will not work as expected.

    Examples

    A code example is worth a thousand words, so here is a quick summary of the main transaction operations. They are described in more detail below.

    With the synchronous API
    try {
        TransactionResult result = transactions.run((ctx) -> {
            // Inserting a doc:
            ctx.insert(collection, "doc-a", JsonObject.create());
    
            // Getting documents:
            // Use ctx.getOptional if the document may or may not exist
            Optional<TransactionGetResult> docOpt =
                    ctx.getOptional(collection, "doc-a");
    
            // Use ctx.get if the document should exist, and the transaction
            // will fail if it does not
            TransactionGetResult docA = ctx.get(collection, "doc-a");
    
            // Replacing a doc:
            TransactionGetResult docB = ctx.get(collection, "doc-b");
            // TransactionGetResult is immutable, so get its content as a
            // mutable JsonObject
            JsonObject content = docB.contentAs(JsonObject.class);
            content.put("transactions", "are awesome");
            ctx.replace(docB, content);
    
            // Removing a doc:
            TransactionGetResult docC = ctx.get(collection, "doc-c");
            ctx.remove(docC);
    
            ctx.commit();
        });
    } catch (TransactionCommitAmbiguous e) {
        System.err.println("Transaction possibly committed");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }
    With the asynchronous API
    Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
        return
                // Inserting a doc:
                ctx.insert(collection.reactive(), "doc-a", JsonObject.create())
    
                        // Getting and replacing a doc:
                        .then(ctx.get(collection.reactive(), "doc-b"))
                        .flatMap(docB -> {
                            JsonObject content = docB.contentAs(JsonObject.class);
                            content.put("transactions", "are awesome");
                            return ctx.replace(docB, content);
                        })
    
                        // Getting and removing a doc:
                        .then(ctx.get(collection.reactive(), "doc-c"))
                        .flatMap(doc -> ctx.remove(doc))
    
                        // Committing:
                        .then(ctx.commit());
    
    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguous) {
            System.err.println("Transaction possibly committed: ");
        }
        else {
            System.err.println("Transaction failed: ");
        }
    
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // System.err is used for example, log failures to your own logging system
            System.err.println(err.toString());
        }
    });
    
    // Normally you will chain this result further and ultimately subscribe.
    // For simplicity, here we just block on the result.
    result.block();

    Transaction Mechanics at a High-Level

    While this document is focussed on presenting how transactions are used at the API level, it is useful to have a high-level understanding of the mechanics. Reading this section is completely optional.

    There are two main mechanics to understand.

    Recall that the application-provided lambda containing the transaction logic, may be run multiple times by the transactions library. Each such run is called an 'attempt' inside the overall transaction.

    The first mechanic is that each of these attempts adds an entry to a metadata document in the Couchbase cluster. These metadata documents being with "_txn:atr-", and each can contain entries for multiple attempts. These metadata documents are viewable, and they should not be modified externally.

    Each attempt entry stores some metadata and, crucially, whether the attempt has committed or not. Attempt entries are more often called "ATR entries" after the ATRs (Active Transaction Records) they are stored in.

    The second mechanic is that mutating a document inside a transaction, does not directly change the body of the document. Instead, the post-transaction version of the document is staged alongside the document (technically in its extended attributes (XATTRs)). In this way, all changes are invisible to all parts of the Couchbase Data Platform until the commit point is reached.

    These staged document changes effectively act as a lock against other transactions trying to modify the document, preventing write-write conflicts. There are safety mechanisms to ensure that leftover staged changes from a failed transaction cannot block live transactions indefinitely.

    Only once the lambda has successfully run to conclusion, will the attempt be committed. This updates the ATR entry, which can be used as a signal by transactional actors as to whether to use the post-transaction version of a document from its XATTRs. Hence updating the ATR entry is effectively an atomic commit switch for the attempt (and by extension the transaction).

    Only after this atomic commit point is reached will the individual documents be committed (or "unstaged"). This is only required for non-transactional actors. Transactional actors will begin reading the post-transactional version of documents as soon as the ATR entry is changed to committed.

    Mutating Documents

    Replacing

    Replacing a document requires a ctx.get() call first. This is necessary to ensure that the document is not involved in another transaction. (If it is, then the transaction will handle this, generally by rolling back what has been done so far, and retrying the lambda.)

    With the asynchronous API:
    transactions.run((ctx) -> {
        TransactionGetResult anotherDoc = ctx.get(collection, "anotherDoc");
        JsonObject content = anotherDoc.contentAs(JsonObject.class);
        content.put("transactions", "are awesome");
        ctx.replace(anotherDoc, content);
    });
    Synchronous API:
    transactions.reactive().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> {
                    JsonObject content = doc.contentAs(JsonObject.class);
                    content.put("transactions", "are awesome");
                    return ctx.replace(doc, content);
                })
                .then(ctx.commit());
    });

    Removing

    As with replaces, removing a document requires a ctx.get() call first.

    With the asynchronous API:
    transactions.reactive().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> ctx.remove(doc));
    });
    Synchronous API:
    transactions.run((ctx) -> {
        TransactionGetResult anotherDoc = ctx.get(collection, "anotherDoc");
        ctx.remove(anotherDoc);
    });

    Inserting

    As above, mutations currently rely on having an existing document to stage their changes alongside. For inserts, an existing document does not exist, so the protocol currently creates an empty, but visible, marker document. Transactional actors will ignore this marker document until the commit point is reached, but non-transaction actors will be able to view it (though its body is empty).

    The forthcoming Couchbase Server 6.6 release will resolve this by instead creating an invisible "shadow document" for staging.

    With the asynchronous API:
    transactions.reactive().run((ctx) -> {
        return ctx.insert(collection.reactive(), "docId", JsonObject.create()).then();
    }).block();
    With the synchronous API:
    transactions.run((ctx) -> {
        String docId = "docId";
    
        ctx.insert(collection, docId, JsonObject.create());
    
    });

    Getting Documents

    There are two ways to get a document, get and getOptional:

    transactions.run((ctx) -> {
        String docId = "a-doc";
    
        Optional<TransactionGetResult> docOpt = ctx.getOptional(collection, docId);
        TransactionGetResult doc = ctx.get(collection, docId);
    });

    get will cause the transaction to fail with TransactionFailed (after rolling back any changes, of course). It is provided as a convenience method so the developer does not have to check the Optional if the document must exist for the transaction to succeed.

    Gets will 'read your own writes', e.g. this will succeed:

    transactions.run((ctx) -> {
        String docId = "docId";
    
        ctx.insert(collection, docId, JsonObject.create());
    
        Optional<TransactionGetResult> doc = ctx.getOptional(collection, docId);
    
        assert (doc.isPresent());
    });

    Committing

    Committing is automatic: if there is no explicit call to ctx.commit() at the end of the transaction logic callback, and no exception is thrown, it will be committed.

    With the asynchronous API, if you leave off the explicit call to commit then you may need to call .then() on the result of the chain to convert it to the required Mono<Void> return type:

    Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> {
                    JsonObject content = doc.contentAs(JsonObject.class);
                    content.put("transactions", "are awesome");
                    return ctx.replace(doc, content);
                })
                .then();
    });

    As described above, as soon as the transaction is committed, all its changes will be atomically visible to transactional actors. The changes will also be committed (or "unstaged") so they are visible to non-transactional actors, in an eventually consistent fashion.

    Commit is final: after the transaction is committed, it cannot be rolled back, and no further operations are allowed on it.

    A Full Transaction Example

    Let’s pull together everything so far into a more real-world example of a transaction.

    This example is based on the Game Simulation sample bucket, which can be installed in the Couchbase Admin UI through Settings → Sample Buckets. (The bucket does not have to be installed to run the sample though, the sample will create any required documents).

    The sample data simulates that of a simple Massively Multiplayer Online game, and includes documents representing:

    • Players, with experience points and levels;

    • Monsters, with hitpoints, and the number of experience points a player earns from their death.

    In this example, the player is dealing damage to the monster. The player’s client has sent this instruction to a central server, where we’re going to record that action. We’re going to do this in a transaction, as we don’t want a situation where the monster is killed, but we fail to update the player’s document with the earned experience. (Though this is just a demo - in reality, the game would likely live with the small risk and limited impact of this, rather than pay the performance cost for using a transaction.)

    A complete version of this example is available on our GitHub transactions examples page.

    public void playerHitsMonster(int damage, String playerId, String monsterId) {
        Transactions transactions = getTransactions();
    
        try {
            transactions.run((ctx) -> {
                TransactionGetResult monsterDoc = ctx.get(collection, monsterId);
                TransactionGetResult playerDoc = ctx.get(collection, playerId);
    
                int monsterHitpoints = monsterDoc.contentAs(JsonObject.class).getInt("hitpoints");
                int monsterNewHitpoints = monsterHitpoints - damage;
    
                if (monsterNewHitpoints <= 0) {
                    // Monster is killed.  The remove is just for demoing, and a more realistic example would set a
                    // "dead" flag or similar.
                    ctx.remove(monsterDoc);
    
                    // The player earns experience for killing the monster
                    int experienceForKillingMonster = monsterDoc.contentAs(JsonObject.class).getInt(
                            "experienceWhenKilled");
                    int playerExperience = playerDoc.contentAs(JsonObject.class).getInt("experience");
                    int playerNewExperience = playerExperience + experienceForKillingMonster;
                    int playerNewLevel = calculateLevelForExperience(playerNewExperience);
    
                    JsonObject playerContent = playerDoc.contentAs(JsonObject.class);
    
                    playerContent.put("experience", playerNewExperience);
                    playerContent.put("level", playerNewLevel);
    
                    ctx.replace(playerDoc, playerContent);
                } else {
                    // Monster is damaged but still alive
                    JsonObject monsterContent = monsterDoc.contentAs(JsonObject.class);
                    monsterContent.put("hitpoints", monsterNewHitpoints);
    
                    ctx.replace(monsterDoc, monsterContent);
                }
            });
        } catch (TransactionFailed e) {
            // The operation failed.   Both the monster and the player will be untouched.
    
            // Situations that can cause this would include either the monster
            // or player not existing (as get is used), or a persistent
            // failure to be able to commit the transaction, for example on
            // prolonged node failure.
        }
    }

    Concurrency with Non-Transactional Writes

    This release of transactions for Couchbase requires a degree of co-operation from the application. Specifically, the application should ensure that non-transactional writes (such as key-value writes or N1QL UPDATES) are never done concurrently with transactional writes, on the same document.

    If two such writes do conflict then the transactional write will 'win', overwriting the non-transactional write.

    Note this only applies to writes. Any non-transactional reads concurrent with transactions are fine, and are at a Read Committed level.

    To help detect that this co-operative requirement is fulfilled, the application can subscribe to the client’s event logger and check for any IllegalDocumentState events, like so:

    cluster.environment().eventBus().subscribe(event -> {
        if (event instanceof IllegalDocumentState) {
            // log this event for review
        }
    });

    These events will be raised in the advent of a non-transactional write being detected and overridden. The event contains the key of the document involved, to aid the application with debugging.

    Rollback

    If an exception is thrown, either by the application from the lambda, or by the transactions library, then that attempt is rolled back. The transaction logic may or may not be retried, depending on the exception.

    If the transaction is not retried then it will throw a TransactionFailed exception, and its getCause method can be used for more details on the failure.

    The application can use this to signal why it triggered a rollback, as so:

    class BalanceInsufficient extends RuntimeException {}
    
    try {
        transactions.run((ctx) -> {
            TransactionGetResult customer = ctx.get(collection, "customer-name");
    
            if (customer.contentAsObject().getInt("balance") < costOfItem) {
                throw new BalanceInsufficient();
            }
            // else continue transaction
        });
    } catch (TransactionCommitAmbiguous e) {
        // This exception can only be thrown at the commit point, after the
        // BalanceInsufficient logic has been passed, so there is no need to
        // check getCause here.
        System.err.println("Transaction possibly committed");
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        if (e.getCause() instanceof BalanceInsufficient) {
            // Re-raise the error
            throw (RuntimeException) e.getCause();
        }
        else {
            System.err.println("Transaction did not reach commit point");
    
            for (LogDefer err : e.result().log().logs()) {
                System.err.println(err.toString());
            }
        }
    }

    The transaction can also be explicitly rolled back:

    transactions.run((ctx) -> {
        TransactionGetResult customer = ctx.get(collection, "customer-name");
    
        if (customer.contentAsObject().getInt("balance") < costOfItem) {
            ctx.rollback();
        }
        // else continue transaction
    });

    In this case, if ctx.rollback() is reached, then the transaction will be regarded as successfully rolled back and no TransactionFailed will be thrown.

    After a transaction is rolled back, it cannot be committed, no further operations are allowed on it, and the library will not try to automatically commit it at the end of the code block.

    Error Handling

    As discussed previously, the transactions library will attempt to resolve many errors itself, through a combination of retrying individual operations and the application’s lambda. This includes some transient server errors, and conflicts with other transactions.

    But there are situations that cannot be resolved, and total failure is indicated to the application via exceptions. These errors include:

    • Any exception thrown by your transaction lambda, either deliberately or through an application logic bug.

    • Attempting to insert a document that already exists.

    • Attempting to remove or replace a document that does not exist.

    • Calling ctx.get on a document key that does not exist.

    Exceptions are crucial to transactions error handling, and it is preferable that you do not catch exceptions inside the lambda.

    Transactions, as they are multi-stage and multi-document, also have a concept of partial success/failure. This is signalled to the application through the TransactionResult.unstagingComplete() method, described later.

    There are three exceptions that the transactions library can raise to the application: TransactionFailed, TransactionExpired and TransactionCommitAmbiguous. All exceptions derive from TransactionFailed for backwards-compatibility purposes.

    TransactionFailed and TransactionExpired

    The transaction definitely did not reach the commit point. TransactionFailed indicates a fast-failure whereas TransactionExpired indicates that retries were made until the expiration point was reached, but this distinction is not normally important to the application and generally TransactionExpired does not need to be handled individually.

    Either way, an attempt will have been made to rollback all changes. This attempt may or may not have been successful, but the results of this will have no impact on the protocol or other actors. No changes from the transaction will be visible (presently with the potential and temporary exception of staged inserts being visible to non-transactional actors, as discussed under Inserting).

    Handling: Generally, debugging exactly why a given transaction failed requires review of the logs, so it is suggested that the application log these on failure. (see Logging). The application may want to try the transaction again later. Alternatively, if transaction completion time is not a priority, then transaction expiration times (which default to 15 seconds) can be extended across the board through TransactionConfigBuilder:

    Transactions transactions = Transactions.create(cluster, TransactionConfigBuilder.create()
            .expirationTime(Duration.ofSeconds(120))
            .build());

    This will allow the protocol more time to get past any transient failures (for example, those caused by a cluster rebalance). The tradeoff to consider with longer expiration times, is that documents that have been staged by a transaction are effectively locked from modification from other transactions, until the expiration time has exceeded.

    Note that expiration is not guaranteed to be followed precisely. For example, if the application were to do a long blocking operation inside the lambda (which should be avoided), then expiration can only trigger after this finishes. Similarly, if the transaction attempts a key-value operation close to the expiration time, and that key-value operation times out, then the expiration time may be exceeded.

    TransactionCommitAmbiguous

    As discussed previously, each transaction has a 'single point of truth' that is updated atomically to reflect whether it is committed.

    However, it is not always possible for the protocol to become 100% certain that the operation was successful, before the transaction expires. That is, the operation may have successfully completed on the cluster, or may succeed soon, but the protocol is unable to determine this (whether due to transient network failure or other reason). This is important as the transaction may or may not have reached the commit point, e.g. succeeded or failed.

    The library raises TransactionCommitAmbiguous to indicate this state. It should be rare to receive this exception.

    If the transaction had in fact successfully reached the commit point, then the transaction will be fully completed ("unstaged") by the asynchronous cleanup process at some point in the future. With default settings this will usually be within a minute, but whatever underlying fault has caused the TransactionCommitAmbiguous may lead to it taking longer.

    If the transaction had not in fact reached the commit point, then the asynchronous cleanup process will instead attempt to roll it back at some point in the future. If unable to, any staged metadata from the transaction will not be visible, and will not cause problems (e.g. there are safety mechanisms to ensure it will not block writes to these documents for long).

    Handling: This error can be challenging for an application to handle. As with TransactionFailed it is recommended that it at least writes any logs from the transaction, for future debugging. It may wish to retry the transaction at a later point, or globally extend transactional expiration times to give the protocol additional time to resolve the ambiguity.

    TransactionResult.unstagingComplete()

    As above, there is a 'single point of truth' for a transaction. After this atomic commit point is reached, the documents themselves will still be individually committed (we also call this "unstaging"). However, transactionally-aware actors will now be returning the post-transaction versions of the documents, and the transaction is effectively fully committed to those actors.

    So if the application is solely working with transaction-aware actors, then the unstaging process is optional. And failures during the unstaging process are not particularly important, in this case. (Note the asynchronous cleanup process will still complete the unstaging process at a later point.)

    Hence, many errors during unstaging will cause the transaction to immediately return success. That is, successful return simply means that the commit point was reached.

    A method TransactionResult.unstagingComplete() indicates whether the unstaging process completed successfully or not. This should be used any time that the application needs all results of the transaction to be immediately available to non-transactional actors (which currently includes N1QL and non-transactional Key-Value reads).

    Error handling differs depending on whether a transaction is before or after the point of commit (or rollback).

    Full Error Handling Example

    Pulling all of the above together, this is the suggested best practice for error handling:

    try {
        TransactionResult result = transactions.run((ctx) -> {
            // ... transactional code here ...
        });
    
        // The transaction definitely reached the commit point. Unstaging
        // the individual documents may or may not have completed
    
        if (result.unstagingComplete()) {
            // Operations with non-transactional actors will want
            // unstagingComplete() to be true.
            cluster.query(" ... N1QL ... ",
                    queryOptions()
                            .consistentWith(result.mutationState()));
    
            String documentKey = "a document key involved in the transaction";
            GetResult getResult = collection.get(documentKey);
        }
        else {
            // This step is completely application-dependent.  It may
            // need to throw its own exception, if it is crucial that
            // result.unstagingComplete() is true at this point.
            // (Recall that the asynchronous cleanup process will
            // complete the unstaging later on).
        }
    }
    catch (TransactionCommitAmbiguous err) {
        // The transaction may or may not have reached commit point
        System.err.println("Transaction returned TransactionCommitAmbiguous and" +
                " may have succeeded, logs:");
    
        // Of course, the application will want to use its own logging rather
        // than System.err
        err.result().log().logs().forEach(log -> System.err.println(log.toString()));
    }
    catch (TransactionFailed err) {
        // The transaction definitely did not reach commit point
        System.err.println("Transaction failed with TransactionFailed, logs:");
        err.result().log().logs().forEach(log -> System.err.println(log.toString()));
    }

    Asynchronous Cleanup

    Transactions will try to clean up after themselves in the advent of failures. However, there are situations that inevitably created failed, or 'lost' transactions, such as an application crash.

    This requires an asynchronous cleanup task, described in this section.

    Creating the Transactions object spawns a background cleanup task, whose job it is to periodically scan for expired transactions and clean them up. It does this by scanning a subset of the Active Transaction Record (ATR) transaction metadata documents on a bucket. As you’ll recall from earlier, an entry for each transaction attempt exists in one of these documents. (They are removed during cleanup or at some time after successful completion.)

    The default settings are tuned to find expired transactions reasonably quickly, while creating neglible impact from the background reads required by the scanning process. To be exact, with default settings it will generally find expired transactions within 60 seconds, and use less than 20 reads per second. This is unlikely to impact performance on any cluster, but the settings may be tuned as desired.

    Cleanup is done on each bucket in the cluster.

    All applications connected to the same cluster and running Transactions will share in the cleanup, via a low-touch communication protocol on the "_txn:client-record" metadata document that will be created in each bucket in the cluster. This document is visible and should not be modified externally. It is maintained automatically by the transactions library. All ATRs on a bucket will be distributed between all cleanup clients, so increasing the number of applications will not increase the reads required for scanning.

    An application may cleanup transactions created by another application.

    It is important to understand that if an application is not running, then cleanup is not running. (This is particularly relevant to developers running unit tests or similar.)

    If this is an issue, then the deployment may want to consider running a simple application at all times that just opens a Transactions object, to guarantee that cleanup is running.

    Configuring Cleanup

    The cleanup settings can be configured as so:

    Setting Default Description

    cleanupWindow

    60 seconds

    This determines how long a cleanup 'run' is; that is, how frequently this client will check its subset of ATR documents. It is perfectly valid for the application to change this setting, which is at a conservative default. Decreasing this will cause expiration transactions to be found more swiftly (generally, within this cleanup window), with the tradeoff of increasing the number of reads per second used for the scanning process.

    cleanupLostAttempts

    true

    This is the thread that takes part in the distributed cleanup process described above, that cleans up expired transactions created by any client. It is strongly recommended that it is left enabled.

    cleanupClientAttempts

    true

    This thread is for cleaning up transactions created just by this client. The client will preferentially aim to send any transactions it creates to this thread, leaving transactions for the distributed cleanup process only when it is forced to (for example, on an application crash). It is strongly recommended that it is left enabled.

    Monitoring Cleanup

    If the application wishes to monitor cleanup it may subscribe to these events:

    cluster.environment().eventBus().subscribe(event -> {
        if (event instanceof TransactionCleanupAttempt
                || event instanceof TransactionCleanupEndRunEvent) {
            // log this event
        }
    });

    TransactionCleanupEndRunEvent is raised whenever a current 'run' is finished, and contains statistics from the run. (A run is typically around every 60 seconds, with default configuration.)

    A TransactionCleanupAttempt event is raised when an expired transaction was found by this process, and a cleanup attempt was made. It contains whether that attempt was successful, along with any logs relevant to the attempt.

    Logging

    To aid troubleshooting, each transaction maintains a list of log entries, which can be logged on failure like this:

    } catch (TransactionFailed e) {
        // 'yourLogger' is the logging approach your application uses
        yourLogger.warning("Transaction {} failed:", e.result().transactionId());
        for (LogDefer err : e.result().log().logs()) {
            yourLogger.info(err.toString());
        }
    }

    or for the asynchronous API:

    .doOnError(err -> {
        if (error instanceof TransactionFailed) {
            TransactionFailed err = (TransactionFailed) error;
            yourLogger.warning("Transaction {} failed:", err.result().transactionId());
            for (LogDefer e : err.result().log().logs()) {
                yourLogger.info(err.toString());
            }
        }
    });

    A failed transaction can involve dozens, even hundreds, of lines of logging, so the application may prefer to write failed transactions into a separate file.

    For convenience there is also a config option that will automatically write this programmatic log to the standard Couchbase Java logging configuration inherited from the SDK if a transaction fails. This will log all lines of any failed transactions, to WARN level:

    TransactionConfigBuilder.create().logOnFailure(true, Event.Severity.WARN).build();

    By default the Couchbase Java logging event-bus is setup to look for and use SLF4J/logback, log4j1, and log4j2 on the classpath, and to fallback to java.util.Logging.

    Please see the Java SDK logging documentation for details.

    Most applications will have their own preferred Java logging solution in-place already. For those starting from scratch here is a complete example using the basic java.util.Logging:

    final Logger LOGGER = Logger.getLogger("transactions");
    
    try {
        TransactionResult result = transactions.run((ctx) -> {
            // ... transactional code here ...
        });
    }
    catch (TransactionCommitAmbiguous err) {
        // The transaction may or may not have reached commit point
        LOGGER.info("Transaction returned TransactionCommitAmbiguous and" +
                " may have succeeded, logs:");
    
        // Of course, the application will want to use its own logging rather
        // than System.err
        err.result().log().logs().forEach(log -> LOGGER.info(log.toString()));
    }
    catch (TransactionFailed err) {
        // The transaction definitely did not reach commit point
        LOGGER.info("Transaction failed with TransactionFailed, logs:");
        err.result().log().logs().forEach(log -> LOGGER.info(log.toString()));
    }

    Concurrent Operations with the Async API

    The async API allows operations to be performed concurrently inside a transaction, which can assist performance. There are two rules the application needs to follow:

    • The first mutation must be performed alone, in serial. This is because the first mutation also triggers the creation of metadata for the transaction.

    • All concurrent operations must be allowed to complete fully, so the transaction library can track which operations need to be rolled back in the event of failure. This means the application must 'swallow' the error, but record that an error occurred, and then at the end of the concurrent operations, if an error occurred, throw an error to cause the transaction to retry.

    These rules are demonstrated here:

    List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5");
    
    ReactiveCollection coll = collection.reactive();
    
    TransactionResult result = transactions.reactive((ctx) -> {
    
        // Tracks whether all operations were successful
        AtomicBoolean allOpsSucceeded = new AtomicBoolean(true);
    
        // The first mutation must be done in serial, as it also creates a metadata entry
        return ctx.get(coll, docIds.get(0))
                .flatMap(doc -> {
                    JsonObject content = doc.contentAsObject();
                    content.put("value", "updated");
                    return ctx.replace(doc, content);
                })
    
                // Do all other docs in parallel
                .thenMany(Flux.fromIterable(docIds.subList(1, docIds.size()))
                        .flatMap(docId -> ctx.get(coll, docId)
                                        .flatMap(doc -> {
                                            JsonObject content = doc.contentAsObject();
                                            content.put("value", "updated");
                                            return ctx.replace(doc, content);
                                        })
                                        .onErrorResume(err -> {
                                            allOpsSucceeded.set(false);
                                            // App should replace this with logging
                                            err.printStackTrace();
    
                                            // Allow other ops to finish
                                            return Mono.empty();
                                        }),
    
                                // Run these in parallel
                                docIds.size())
    
                // The commit or rollback must also be done in serial
                ).then(Mono.defer(() -> {
                    // Commit iff all ops succeeded
                    if (allOpsSucceeded.get()) {
                        return ctx.commit();
                    } else {
                        throw new RuntimeException("Retry the transaction");
                    }
                }));
    }).block();

    Deferred Commits

    The deferred commit feature is currently in alpha, and the API may change.

    Deferred commits allow a transaction to be paused just before the commit point. Optionally, everything required to finish the transaction can then be bundled up into a context that may be serialized into a String or byte array, and deserialized elsewhere (for example, in another process). The transaction can then be committed, or rolled back.

    The intention behind this feature is to allow multiple transactions, potentially spanning multiple databases, to be brought to just before the commit point, and then all committed together.

    Here’s an example of deferring the initial commit and serializing the transaction:

    try {
        TransactionResult result = transactions.run((ctx) -> {
            JsonObject initial = JsonObject.create().put("val", 1);
            ctx.insert(collection, "a-doc-id", initial);
    
            // Defer means don't do a commit right now.  `serialized` in the result will be present.
            ctx.defer();
        });
    
        // Available because ctx.defer() was called
        assert (result.serialized().isPresent());
    
        TransactionSerializedContext serialized = result.serialized().get();
    
        // This is going to store a serialized form of the transaction to pass around
        byte[] encoded = serialized.encodeAsBytes();
    
    } catch (TransactionFailed e) {
        // System.err is used for example, log failures to your own logging system
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    And then committing the transaction later:

    TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded);
    
    try {
        TransactionResult result = transactions.commit(serialized);
    
    } catch (TransactionFailed e) {
        // System.err is used for example, log failures to your own logging system
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    Alternatively the transaction can be rolled back:

    TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded);
    
    try {
        TransactionResult result = transactions.rollback(serialized);
    
    } catch (TransactionFailed e) {
        // System.err is used for example, log failures to your own logging system
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    As usual, once committed, the transactions library will make every effort to commit the transaction. Even if it expires (which throws TransactionExpired as usual), a background cleanup process will continue trying to commit. The same applies for rollback.

    The transaction expiry timer (which is configurable) will begin ticking once the transaction starts, and is not paused while the transaction is in a deferred state.

    Further Reading