Distributed Transactions from the C++ SDK

    +
    A practical guide to using Couchbase’s distributed ACID transactions, via the C++ API.

    This document presents a practical HOWTO on using the transactions library, following on from our transactions documentation.

    The C++ Transactions API is built upon the Couchbase C SDK, libcouchbase (LCB), which is automatically installed by the transactions library. Applications built using C SDK and C Transactions can run in parallel without interfering with each other.

    Below we show you how to create Transactions, step-by-step. You may also want to start with our transactions examples repository, which features useful downloadable examples of using Distributed Transactions.

    API docs are available online.

    Requirements

    • Couchbase Server 6.6.1 or above.

    • NTP should be configured so nodes of the Couchbase cluster are in sync with time.

    • The application, if it is using extended attributes (XATTRs), must avoid using the XATTR field txn, which is reserved for Couchbase use.

    If using a single node cluster (for example, during development), then note that the default number of replicas for a newly created bucket is 1. If left at this default, then all Key-Value writes performed at with durabiltiy will fail with a DurabilityImpossibleException. In turn this will cause all transactions (which perform all Key-Value writes durably) to fail. This setting can be changed via GUI or command line. If the bucket already existed, then the server needs to be rebalanced for the setting to take affect.

    Getting Started

    Couchbase transactions require no additional components or services to be configured. Simply add the transactions library into your project. The latest version, as of 19 April 2021, is 1.0.0.

    Installing on Linux

    We are currently distributing our linux libraries in tar files, which can be found:

    The following steps show how to install transactions on RHEL/CentOS 8. Other linux platforms will be similar:

    $ sudo yum groupinstall "Development Tools"
    $ sudo yum install boost-devel
    $ wget https://packages.couchbase.com/clients/transactions-cxx/couchbase-transactions-1.0.0-1.253.el8.x86_64.tar
    $ tar xf couchbase-transactions-1.0.0-1.253.el8.x86_64.tar
    $ sudo yum install couchbase-transactions*.rpm

    Installing on Mac OS X

    Mac libraries are available through homebrew. Once you have homebrew, add our tap, and install:

    $ brew tap couchbaselabs/homebrew-couchbase-transactions-cxx
    $ brew install couchbase-transactions-cxx

    Build and run example project

    On Linux or Mac OSX, building and running the example project is the same:

    $ git clone git://github.com/couchbaselabs/couchbase-transactions-cxx-examples.git
    $ cd couchbase-transactions-cxx-examples/game/
    $ mkdir build && cd build
    $ cmake ../
    $ make

    Initializing Transactions

    The starting point is the transactions object. It is very important that the application ensures that only one of these is created, as it performs automated background processes that should not be duplicated.

    // Initialize the Couchbase cluster
    couchbase::cluster cluster("couchbase://localhost", "transactor", "mypass");
    auto bucket = cluster.bucket("transact");
    auto collection = bucket->default_collection();
    
    // Create the single Transactions object
    couchbase::transactions::transactions transactions(cluster, {});

    Configuration

    Transactions can optionally be configured at the point of creating the transactions object:

    couchbase::transactions::transaction_config configuration;
    configuration.durability_level(couchbase::transactions::durability_level::PERSIST_TO_MAJORITY);
    couchbase::transactions::transactions transactions(cluster, configuration);

    The default configuration will perform all writes with the durability setting Majority, ensuring that each write is available in-memory on the majority of replicas before the transaction continues. There are two higher durability settings available that will additionally wait for all mutations to be written to physical storage on either the active or the majority of replicas, before continuing. This further increases safety, at a cost of additional latency.

    A level of None is present but its use is discouraged and unsupported. If durability is set to None, then ACID semantics are not guaranteed.

    Creating a Transaction

    A core idea of the library is that the application supplies the logic for transaction inside a lambda, including any conditional logic required, and the transactions library takes care of getting the transaction committed. If the library encounters a transient error, such as a temporary conflict with another transaction, then it can rollback what has been done so far and run the lambda again. The application does have to do these retries and error handling itself.

    Each run of the lambda is called an attempt, inside an overall transaction.

    try {
        transactions.run([&](couchbase::transactions::attempt_context& ctx) {
            // 'ctx' permits getting, inserting, removing and replacing documents,
            // along with committing and rolling back the transaction
    
            // ... Your transaction logic here ...
    
            // This call is optional -- if you leave it off,
            // the transaction will be committed anyway.
            ctx.commit();
        });
    } catch (couchbase::transactions::transaction_failed& e) {
        std::cerr << "Transaction did not reach commit point: " << e.what() << "\n";
    }

    Examples

    A code example is worth a thousand words, so here is a quick summary of the main transaction operations.

    try {
        transactions.run([&](couchbase::transactions::attempt_context& ctx) {
            // Inserting a doc:
            ctx.insert(collection, "doc-a", nlohmann::json({}));
    
            // Getting documents:
            // Use ctx.get_optional() if the document may or may not exist
            auto doc_opt = ctx.get_optional(collection, "doc-a");
            if (doc_opt) {
                couchbase::transactions::transaction_get_result& doc = doc_opt.value();
            }
    
            // Use ctx.get if the document should exist, and the transaction
            // will fail if it does not
            couchbase::transactions::transaction_get_result doc_a = ctx.get(collection, "doc-a");
    
            // Replacing a doc:
            couchbase::transactions::transaction_get_result doc_b = ctx.get(collection, "doc-b");
            nlohmann::json content = doc_b.content<nlohmann::json>();
            content["transactions"] = "are awesome";
            ctx.replace(collection, doc_b, content);
    
            // Removing a doc:
            couchbase::transactions::transaction_get_result doc_c = ctx.get(collection, "doc-c");
            ctx.remove(collection, doc_c);
    
            ctx.commit();
        });
    } catch (couchbase::transactions::transaction_failed& e) {
        std::cerr << "Transaction did not reach commit point: " << e.what() << "\n";
    }

    Transaction Mechanics

    While this document is focussed on presenting how transactions are used at the API level, it is useful to have a high-level understanding of the mechanics. Reading this section is completely optional.

    Recall that the application-provided lambda (containing the transaction logic) may be run multiple times by the transactions library. Each such run is called an 'attempt' inside the overall transaction.

    Active Transaction Record Entries

    The first mechanic is that each of these attempts adds an entry to a metadata document in the Couchbase cluster. These metadata documents:

    • Are named Active Transaction Records, or ATRs.

    • Are created and maintained automatically.

    • Begin with "_txn:atr-".

    • Each contain entries for multiple attempts.

    • Are viewable, and they should not be modified externally.

    Each such ATR entry stores some metadata and, crucially, whether the attempt has committed or not. In this way, the entry acts as the single point of truth for the transaction, which is essential for providing an 'atomic commit' during reads.

    Staged Mutations

    The second mechanic is that mutating a document inside a transaction, does not directly change the body of the document. Instead, the post-transaction version of the document is staged alongside the document (technically in its extended attributes (XATTRs)). In this way, all changes are invisible to all parts of the Couchbase Data Platform until the commit point is reached.

    These staged document changes effectively act as a lock against other transactions trying to modify the document, preventing write-write conflicts.

    Cleanup

    There are safety mechanisms to ensure that leftover staged changes from a failed transaction cannot block live transactions indefinitely. These include an asynchronous cleanup process that is started with the creation of the Transactions object, and scans for expired transactions created by any application, on all buckets.

    Note that if an application is not running, then this cleanup is also not running.

    The cleanup process is detailed below in [Asynchronous Cleanup].

    Committing

    Only once the lambda has successfully run to conclusion, will the attempt be committed. This updates the attempt entry, which can be used as a signal by transactional actors as to whether to use the post-transaction version of a document from its XATTRs. Hence updating the ATR entry is effectively an 'atomic commit' switch for the transaction.

    After this atomic commit point is reached, the individual documents be committed (or "unstaged"). This provides an eventually consistent commit for non-transactional actors (including standard Key-Value reads and N1QL statements). Transactions will begin reading the post-transactional version of documents as soon as the ATR entry is changed to committed.

    Mutating Documents

    Replacing

    Replacing a document requires a ctx.get() call first. This is necessary to ensure that the document is not involved in another transaction. (If it is, then the transaction will handle this, generally by rolling back what has been done so far, and retrying the lambda.)

    transactions.run([&](couchbase::transactions::attempt_context& ctx) {
        std::string id = "doc-a";
        couchbase::transactions::transaction_get_result doc = ctx.get(collection, id);
        nlohmann::json content = doc.content<nlohmann::json>();
        content["transactions"] = "are awesome";
        ctx.replace(collection, doc, content);
    });

    Removing

    As with replaces, removing a document requires a ctx.get() call first.

    transactions.run([&](couchbase::transactions::attempt_context& ctx) {
        std::string id = "doc-a";
        auto doc_opt = ctx.get_optional(collection, id);
        if (doc_opt) {
            ctx.remove(collection, doc_opt.value());
        }
    });

    Inserting

    transactions.run([&](couchbase::transactions::attempt_context& ctx) {
        std::string id = "doc_id";
        nlohmann::json value{
            { "foo", "bar" },
        };
        ctx.insert(collection, id, value);
    });

    Getting Documents

    There are two ways to get a document, get and getOptional:

    transactions.run([&](couchbase::transactions::attempt_context& ctx) {
        std::string id = "doc-a";
        auto doc_opt = ctx.get_optional(collection, id);
        if (doc_opt) {
            couchbase::transactions::transaction_get_result& doc = doc_opt.value();
        }
    });

    get will cause the transaction to fail with transaction_failed (after rolling back any changes, of course). It is provided as a convenience method so the developer does not have to check the optional if the document must exist for the transaction to succeed.

    Gets will 'read your own writes', e.g. this will succeed:

    transactions.run([&](couchbase::transactions::attempt_context& ctx) {
        std::string id = "doc_id";
        nlohmann::json value{
            { "foo", "bar" },
        };
        ctx.insert(collection, id, value);
        // document must be accessible
        couchbase::transactions::transaction_get_result doc = ctx.get(collection, id);
    });

    Committing

    Committing is automatic: if there is no explicit call to ctx.commit() at the end of the transaction logic callback, and no exception is thrown, it will be committed.

    As soon as the transaction is committed, all its changes will be atomically visible to reads from other transactions. The changes will also be committed (or "unstaged") so they are visible to non-transactional actors, in an eventually consistent fashion.

    Commit is final: after the transaction is committed, it cannot be rolled back, and no further operations are allowed on it.

    An asynchronous cleanup process ensures that once the transaction reaches the commit point, it will be fully committed - even if the application crashes.

    Threads

    The cluster, bucket, collection, and transactions objects all are safe to use across multiple threads. When creating the cluster, you can specify the maximum number of libcouchbase instances the cluster and bucket can use. Transactions currently only using KV operations from the bucket. Specifying max_bucket_instances in the cluster_options when creating the cluster is sufficient. This will be the maximum number of concurrent transaction operations which can be made:

    couchbase::cluster c("couchbase://localhost", "transactor", "mypass", cluster_options().max_bucket_instances(10));
    auto coll = c.bucket("transact")->default_collection();

    The example below allows for up to 10 instances to be created, then has 10 threads get and replace the content in a document:

    std::list<std::thread> threads;
    std::atomic<int> counter {0};
    for (int i=0; i<10; i++) {
      threads.emplace_back([&]() {
        transactions.run([&](couchbase::transactions::attempt_context& ctx) {
          std::string id = "doc_a";
          auto doc = ctx.get(coll, id);
          auto doc_content = doc.value();
          doc_content["counter"] = ++counter;
          ctx.replace(coll, doc, doc_content);
        });
      });
    }
    for (auto& t: threads) {
      if (t.joinable()) {
        t.join();
      }
    }

    A Full Transaction Example

    Let’s pull together everything so far into a more real-world example of a transaction.

    This example simulates a simple Massively Multiplayer Online game, and includes documents representing:

    • Players, with experience points and levels;

    • Monsters, with hitpoints, and the number of experience points a player earns from their death.

    In this example, the player is dealing damage to the monster. The player’s client has sent this instruction to a central server, where we’re going to record that action. We’re going to do this in a transaction, as we don’t want a situation where the monster is killed, but we fail to update the player’s document with the earned experience.

    (Though this is just a demo - in reality, the game would likely live with the small risk and limited impact of this, rather than pay the performance cost for using a transaction.)

    A complete version of this example is available on our GitHub transactions examples page.

    try {
        transactions.run([&](couchbase::transactions::attempt_context& ctx) {
            auto monster = ctx.get(collection, monster_id);
            const Monster& monster_body = monster.content<Monster>();
    
            int monster_hitpoints = monster_body.hitpoints;
            int monster_new_hitpoints = monster_hitpoints - damage;
    
            auto player = ctx.get(collection, player_id);
    
            if (monster_new_hitpoints <= 0) {
                // Monster is killed. The remove is just for demoing, and a more realistic examples would set a "dead" flag or similar.
                ctx.remove(collection, monster);
    
                const Player& player_body = player.content<Player>();
    
                // the player earns experience for killing the monster
                int experience_for_killing_monster = monster_body.experience_when_killed;
                int player_experience = player_body.experience;
                int player_new_experience = player_experience + experience_for_killing_monster;
                int player_new_level = calculate_level_for_experience(player_new_experience);
    
                Player player_new_body = player_body;
                player_new_body.experience = player_new_experience;
                player_new_body.level = player_new_level;
                ctx.replace(collection, player, player_new_body);
            } else {
                Monster monster_new_body = monster_body;
                monster_new_body.hitpoints = monster_new_hitpoints;
                ctx.replace(collection, monster, monster_new_body);
            }
        });
    } catch (couchbase::transactions::transaction_failed& e) {
        // The operation failed. Both the monster and the player will be untouched
    
        // Situations that can cause this would include either the monster
        // or player not existing (as get is used), or a persistent
        // failure to be able to commit the transaction, for example on
        // prolonged node failure.
    }

    Concurrency with Non-Transactional Writes

    This release of transactions for Couchbase requires a degree of co-operation from the application. Specifically, the application should ensure that non-transactional writes (such as key-value writes or N1QL UPDATES) are never done concurrently with transactional writes, on the same document.

    This requirement is to ensure that the strong Key-Value performance of Couchbase was not compromised. A key philosophy of our transactions is that you 'pay only for what you use'.

    If two such writes do conflict then the transactional write will 'win', overwriting the non-transactional write.

    Note this only applies to writes. Any non-transactional reads concurrent with transactions are fine, and are at a Read Committed level.

    Rollback

    If an exception is thrown, either by the application from the lambda, or by the transactions library, then that attempt is rolled back. The transaction logic may or may not be retried, depending on the exception.

    If the transaction is not retried then it will throw a transaction_failed exception, and its cause method can be used for more details on the failure. The application can use this to signal why it triggered a rollback.

    The transaction can also be explicitly rolled back:

    transactions.run([&](couchbase::transactions::attempt_context& ctx) {
        couchbase::transactions::transaction_get_result customer = ctx.get(collection, "customer-name");
    
        auto content = customer.content<nlohmann::json>();
        int balance = content["balance"].get<int>();
        if (balance < cost_of_item) {
            ctx.rollback();
        }
        // else continue transaction
    });

    In this case, if ctx.rollback() is reached, then the transaction will be regarded as successfully rolled back and no TransactionFailed will be thrown.

    After a transaction is rolled back, it cannot be committed, no further operations are allowed on it, and the library will not try to automatically commit it at the end of the code block.

    Error Handling

    As discussed previously, the transactions library will attempt to resolve many errors itself, through a combination of retrying individual operations and the application’s lambda. This includes some transient server errors, and conflicts with other transactions.

    But there are situations that cannot be resolved, and total failure is indicated to the application via exceptions. These errors include:

    • Any exception thrown by your transaction lambda, either deliberately or through an application logic bug.

    • Attempting to insert a document that already exists.

    • Attempting to remove or replace a document that does not exist.

    • Calling ctx.get on a document key that does not exist.

    Once one of these errors occurs, the current attempt is irrevocably failed (though the transaction may retry the lambda). It is not possible for the application to catch the failure and continue. Once a failure has occurred, all other operations tried in this attempt (including commit) will instantly fail.

    Transactions, as they are multi-stage and multi-document, also have a concept of partial success/failure. This is signalled to the application through the TransactionResult.unstagingComplete() method, described later.

    There are three exceptions that the transactions library can raise to the application: TransactionFailed, TransactionExpired and TransactionCommitAmbiguous. All exceptions derive from TransactionFailed for backwards-compatibility purposes.

    TransactionFailed and TransactionExpired

    The transaction definitely did not reach the commit point. TransactionFailed indicates a fast-failure whereas TransactionExpired indicates that retries were made until the expiration point was reached, but this distinction is not normally important to the application and generally TransactionExpired does not need to be handled individually.

    Either way, an attempt will have been made to rollback all changes. This attempt may or may not have been successful, but the results of this will have no impact on the protocol or other actors. No changes from the transaction will be visible (presently with the potential and temporary exception of staged inserts being visible to non-transactional actors, as discussed under Inserting).

    Handling: Generally, debugging exactly why a given transaction failed requires review of the logs, so it is suggested that the application log these on failure. (see Logging). The application may want to try the transaction again later. Alternatively, if transaction completion time is not a priority, then transaction expiration times (which default to 15 seconds) can be extended across the board through TransactionConfigBuilder:

    couchbase::transactions::transaction_config configuration;
    configuration.expiration_time(std::chrono::seconds(120));
    couchbase::transactions::transactions transactions(cluster, configuration);

    This will allow the protocol more time to get past any transient failures (for example, those caused by a cluster rebalance). The tradeoff to consider with longer expiration times, is that documents that have been staged by a transaction are effectively locked from modification from other transactions, until the expiration time has exceeded.

    Note that expiration is not guaranteed to be followed precisely. For example, if the application were to do a long blocking operation inside the lambda (which should be avoided), then expiration can only trigger after this finishes. Similarly, if the transaction attempts a key-value operation close to the expiration time, and that key-value operation times out, then the expiration time may be exceeded.

    TransactionCommitAmbiguous

    As discussed previously, each transaction has a 'single point of truth' that is updated atomically to reflect whether it is committed.

    However, it is not always possible for the protocol to become 100% certain that the operation was successful, before the transaction expires. That is, the operation may have successfully completed on the cluster, or may succeed soon, but the protocol is unable to determine this (whether due to transient network failure or other reason). This is important as the transaction may or may not have reached the commit point, e.g. succeeded or failed.

    The library raises TransactionCommitAmbiguous to indicate this state. It should be rare to receive this exception.

    If the transaction had in fact successfully reached the commit point, then the transaction will be fully completed ("unstaged") by the asynchronous cleanup process at some point in the future. With default settings this will usually be within a minute, but whatever underlying fault has caused the TransactionCommitAmbiguous may lead to it taking longer.

    If the transaction had not in fact reached the commit point, then the asynchronous cleanup process will instead attempt to roll it back at some point in the future. If unable to, any staged metadata from the transaction will not be visible, and will not cause problems (e.g. there are safety mechanisms to ensure it will not block writes to these documents for long).

    Handling: This error can be challenging for an application to handle. As with TransactionFailed it is recommended that it at least writes any logs from the transaction, for future debugging. It may wish to retry the transaction at a later point, or globally extend transactional expiration times to give the protocol additional time to resolve the ambiguity.

    TransactionResult.unstagingComplete()

    This boolean flag indicates whether all documents were able to be unstaged (committed).

    For most use-cases it is not an issue if it is false. All transactional actors will still all the changes from this transaction, as though it had committed fully. The cleanup process is asynchronously working to complete the commit, so that it will be fully visible to non-transactional actors.

    The flag is provided for those rare use-cases where the application requires the commit to be fully visible to non-transactional actors, before it may continue. In this situation the application can raise an error here, or poll all documents involved until they reflect the mutations.

    If you regularly see this flag false, consider increasing the transaction expiration time to reduce the possibility that the transaction times out during the commit.

    Logging

    To aid troubleshooting, the transactions library logs information to stdout. The default logging level is INFO, but can be changed to produce more or less detailed output. For instance, to see very detailed logging:

    // Set logging level to Trace
    couchbase::transactions::set_transactions_logging_level(log_level::TRACE);

    Further Reading