Working with Replications

    +

    Description — Couchbase database replication and synchronization concepts
    Related Content — Conflicts | Inter-Database Replication | Certificate Pinning |

    Replication Protocol

    Couchbase Mobile uses a replication protocol based on WebSockets.

    The replicator is designed to send documents from a source to a target database. The target can be one of the following:

    URLEndpoint

    To replicate data between a local Couchbase Lite database and remote Sync Gateway database or passive peer listener.

    DatabaseEndpoint

    To replicate data between two local Couchbase Lite databases to store data on secondary storage.

    MessageEndpoint

    To replicate with another Couchbase Lite database via a custom transportation protocol such iOS Multipeer Connectivity, Android WiFi Direct, Android NearByConnection, socket based transportation etc.

    Protocol Compatibility

    To use this protocol the replication URL should specify WebSockets as the URL scheme (see the Starting a Replication section below).

    Incompatibilities

    Couchbase Lite’s replication protocol is incompatible with CouchDB-based databases. And since Couchbase Lite 2.x+ only supports the new protocol, you will need to run a version of Sync Gateway that supports it — see: Compatibility.

    Legacy Compatibility

    Clients using Couchbase Lite 1.x can continue to use http as the URL scheme. Sync Gateway 2.0 will automatically use:

    • The 1.x replication protocol when a Couchbase Lite 1.x client connects through http://localhost:4984/db

    • The 2.0 replication protocol when a Couchbase Lite 2.0 client connects through ws://localhost:4984/db.

    Starting Sync Gateway

    Download Sync Gateway and start it from the command line with the configuration file created above.

    ~/Downloads/couchbase-sync-gateway/bin/sync_gateway

    For platform specific installation instructions, refer to the Sync Gateway installation guide.

    Starting a Replication

    Replication can be bidirectional, this means you can start a push/pull replication with a single instance. The replication’s parameters can be specified through the ReplicatorConfiguration object; for example, if you wish to start a push only or pull only replication.

    The following example creates a pull replication with Sync Gateway.

    class MyClass {
        Database database;
        Replicator replicator; (1)
    
        void startReplication() {
            URI uri = null;
            try {
                uri = new URI("wss://10.0.2.2:4984/db"); (2)
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            Endpoint endpoint = new URLEndpoint(uri);
            ReplicatorConfiguration config = new ReplicatorConfiguration(database, endpoint);
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PULL);
            this.replicator = new Replicator(config);
            this.replicator.start();
        }
    
    }
    1 A replication is an asynchronous operation. To keep a reference to the replicator object, you can set it as an instance property.
    2 The URL scheme for remote database URLs uses ws:, or wss: for SSL/TLS connections over wb sockets. In this example the hostname is 10.0.2.2 because the Android emulator runs in a VM that is generally accessible on 10.0.2.2 from the host machine (see Android Emulator networking documentation).
    As of Android Pie, version 9, API 28, cleartext support is disabled, by default. Although wss: protocol URLs are not affected, in order to use the ws: protocol, applications must target API 27 or lower, or must configure application network security as described here.

    To verify that documents have been replicated, you can:

    • Monitor the Sync Gateway sequence number returned by the database endpoint (GET /{tkn-db}/). The sequence number increments for every change that happens on the Sync Gateway database.

    • Query a document by ID on the Sync Gateway REST API (GET /{tkn-db}/{id}).

    • Query a document from the Query Workbench on the Couchbase Server Console.

    Couchbase Lite uses WebSockets as the communication protocol to transmit data. Some load balancers are not configured for WebSocket connections by default (NGINX for example); so it might be necessary to explicitly enable them in the load balancer’s configuration (see Load Balancers).

    By default, the WebSocket protocol uses compression to optimize for speed and bandwidth utilization. The level of compression is set on Sync Gateway and can be tuned in the configuration file (replicator_compression).

    Replication Ordering

    To optimize for speed, the replication protocol doesn’t guarantee that documents will be received in a particular order. So we don’t recommend to rely on that when using the replication or database change listeners for example.

    Replicator Notifications on a Custom Executor

    Prior to version 2.6, Couchbase Lite spun up multiple executors. This policy could result in too many threads being spun up.

    If no listeners are registered to listen to a replicator at the time of the most recent start(. . .), then no subsequently registered listeners will receive notifications.

    An executor manages a pool of threads and, perhaps, a queue in front of the executor, to handle the asynchronous callbacks. Couchbase Lite API calls which are processed by an executor are listed below.

    Query.addChangeListener
    MessageEndpointListerner.addChangeListener
    LiveQuery.addChangeListener
    AbstractReplicator.addDocumentReplicationListener
    AbstractReplicator.addChangeListener
    Database.addChangeListener
    Database.addDocumentChangeListener
    Database.addDatabaseChangeListener
    Database.addChangeListener

    As of version 2.6, Couchbase sometimes uses its own internal executor to run asynchronous client code. While this is fine for small tasks, larger tasks — those that take significant compute time, or that perform I/O — can block Couchbase processing. If this happens your application will fail with a RejectedExecutionException and it may be necessary to create a separate executor on which to run the large tasks.

    The following examples show how to specify a separate executor in the client code. The client code executor can enforce an application policy for delivery ordering and the number of threads.

    Guaranteed Order Delivery

    /**
     * This version guarantees in order delivery and is parsimonious with space
     * The listener does not need to be thread safe (at least as far as this code is concerned).
     * It will run on only thread (the Executor's thread) and must return from a given call
     * before the next call commences.  Events may be delivered arbitrarily late, though,
     * depending on how long it takes the listener to run.
     */
    public class InOrderExample {
        private static final ExecutorService IN_ORDER_EXEC = Executors.newSingleThreadExecutor();
    
        public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
            throws CouchbaseLiteException {
            ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
            config.setContinuous(false);
    
            Replicator repl = new Replicator(config);
            ListenerToken token = repl.addChangeListener(IN_ORDER_EXEC, listener::changed);
    
            repl.start();
    
            return repl;
        }
    }

    Maximum Throughput

    /**
     * This version maximizes throughput.  It will deliver change notifications as quickly
     * as CPU availability allows. It may deliver change notifications out of order.
     * Listeners must be thread safe because they may be called from multiple threads.
     * In fact, they must be re-entrant because a given listener may be running on mutiple threads
     * simultaneously.  In addition, when notifications swamp the processors, notifications awaiting
     * a processor will be queued as Threads, (instead of as Runnables) with accompanying memory
     * and GC impact.
     */
    public class MaxThroughputExample {
        private static final ExecutorService MAX_THROUGHPUT_EXEC = Executors.newCachedThreadPool();
    
        public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
            throws CouchbaseLiteException {
            ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
            config.setContinuous(false);
    
            Replicator repl = new Replicator(config);
            ListenerToken token = repl.addChangeListener(MAX_THROUGHPUT_EXEC, listener::changed);
    
            repl.start();
    
            return repl;
        }
    }

    Extreme Configurability

    /**
     * This version demonstrates the extreme configurability of the CouchBase Lite replicator callback system.
     * It may deliver updates out of order and does require thread-safe and re-entrant listeners
     * (though it does correctly synchronizes tasks passed to it using a SynchronousQueue).
     * The thread pool executor shown here is configured for the sweet spot for number of threads per CPU.
     * In a real system, this single executor might be used by the entire application and be passed to
     * this module, thus establishing a reasonable app-wide threading policy.
     * In an emergency (Rejected Execution) it lazily creates a backup executor with an unbounded queue
     * in front of it.  It, thus, may deliver notifications late, as well as out of order.
     */
    public class PolicyExample {
        private static final int CPUS = Runtime.getRuntime().availableProcessors();
    
        private static ThreadPoolExecutor BACKUP_EXEC;
    
        private static final RejectedExecutionHandler BACKUP_EXECUTION
            = new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                synchronized (this) {
                    if (BACKUP_EXEC =  null) { BACKUP_EXEC = createBackupExecutor(); }
                }
                BACKUP_EXEC.execute(r);
            }
        };
    
        private static ThreadPoolExecutor createBackupExecutor() {
            ThreadPoolExecutor exec
                = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            exec.allowCoreThreadTimeOut(true);
            return exec;
        }
    
        private static final ThreadPoolExecutor STANDARD_EXEC
            = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    
        static { STANDARD_EXEC.setRejectedExecutionHandler(BACKUP_EXECUTION); }
    
        public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
            throws CouchbaseLiteException {
            ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
            config.setContinuous(false);
    
            Replicator repl = new Replicator(config);
            ListenerToken token = repl.addChangeListener(STANDARD_EXEC, listener::changed);
    
            repl.start();
    
            return repl;
        }
    }

    Delta Sync

    Applies only to

    With Delta Sync only the changed parts of a Couchbase document are replicated. This can result in significant savings in bandwidth consumption as well as throughput improvements, especially when network bandwidth is typically constrained.

    Replications to a URLEndpoint (i.e Sync Gateway) automatically use delta sync if the databases.$db.delta_sync.enabled property is set to true in Sync Gateway’s configuration file.

    Replications to a DatabaseEndpoint automatically disable delta sync and replications to a MessageEndpoint automatically enable delta sync.

    Troubleshooting

    As always, when there is a problem with replication, logging is your friend. The following example increases the log output for activity related to replication with Sync Gateway.

    Database.setLogLevel(LogDomain.REPLICATOR, LogLevel.VERBOSE);

    Authentication

    By default, Sync Gateway does not enable authentication. This is to make it easier to get up and running with synchronization. You can enable authentication with the following properties in the configuration file:

    {
      "databases": {
        "mydatabase": {
          "users": {
            "GUEST": {"disabled": true}
          }
        }
      }
    }

    To authenticate with Sync Gateway, an associated user must first be created. Sync Gateway users can be created through the POST /{tkn-db}/_user endpoint on the Admin REST API. Provided that the user exists on Sync Gateway, there are two ways to authenticate from a Couchbase Lite client: Basic Authentication or Session Authentication.

    Basic Authentication

    You can provide a user name and password to the basic authenticator class method. Under the hood, the replicator will send the credentials in the first request to retrieve a SyncGatewaySession cookie and use it for all subsequent requests during the replication. This is the recommended way of using basic authentication. The following example initiates a one-shot replication as the user username with the password password.

    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setAuthenticator(new BasicAuthenticator("username", "password"));
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();

    Session Authentication

    Session authentication is another way to authenticate with Sync Gateway. A user session must first be created through the POST /{tkn-db}/_session endpoint on the Public REST API. The HTTP response contains a session ID which can then be used to authenticate as the user it was created for. The following example initiates a one-shot replication with the session ID that is returned from the POST /{tkn-db}/_session endpoint.

    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setAuthenticator(new SessionAuthenticator("904ac010862f37c8dd99015a33ab5a3565fd8447"));
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();

    Replication Status

    The replication.Status.Activity property can be used to check the status of a replication. For example, when the replication is actively transferring data and when it has stopped.

    replicator.addChangeListener(change -> {
        if (change.getStatus().getActivityLevel() == Replicator.ActivityLevel.STOPPED) {
            Log.i(TAG, "Replication stopped");
        }
    });

    The following table lists the different activity levels in the API and the meaning of each one.

    State Meaning

    STOPPED

    The replication is finished or hit a fatal error.

    OFFLINE

    The replicator is offline as the remote host is unreachable.

    CONNECTING

    The replicator is connecting to the remote host.

    IDLE

    The replication caught up with all the changes available from the server. The IDLE state is only used in continuous replications.

    BUSY

    The replication is actively transferring data.

    The replication change object also has properties to track the progress (change.status.completed and change.status.total). But since the replication occurs in batches and the total count can vary through the course of a replication, those progress indicators are not very useful from the standpoint of an app user. Hence, these should not be used for tracking the actual progress of the replication.

    Replication Status and App Lifecycle

    Couchbase Lite replications will continue running until the app terminates, unless the remote system, or the application, terminates the connection.

    Recall that the Android OS may kill an application without warning. You should explicitly stop replication processes when they are no longer useful (for example, when they are suspended or idle) to avoid socket connections being closed by the OS, which may interfere with the replication process.

    Handling Network Errors

    If an error occurs, the replication status will be updated with an Error which follows the standard HTTP error codes. The following example monitors the replication for errors and logs the error code to the console.

    replicator.addChangeListener(change -> {
        CouchbaseLiteException error = change.getStatus().getError();
        if (error != null) { Log.w(TAG, "Error code:: %d", error); }
    });
    replicator.start();

    When a permanent error occurs (i.e., 404: not found, 401: unauthorized), the replicator (continuous or one-shot) will stop permanently. If the error is temporary (i.e., waiting for the network to recover), a continuous replication will retry to connect indefinitely and if the replication is one-shot it will retry for a limited number of times. The following error codes are considered temporary by the Couchbase Lite replicator and thus will trigger a connection retry.

    • 408: Request Timeout

    • 429: Too Many Requests

    • 500: Internal Server Error

    • 502: Bad Gateway

    • 503: Service Unavailable

    • 504: Gateway Timeout

    • 1001: DNS resolution error

    Replication Events

    You can choose to register for document updates during a replication.

    For example, the code snippet below registers a listener to monitor document replication performed by the replicator referenced by the variable replicator. It prints the document ID of each document received and sent.

    ListenerToken token = replicator.addDocumentReplicationListener(replication -> {
    
        Log.i(TAG, "Replication type: " + ((replication.isPush()) ? "Push" : "Pull"));
        for (ReplicatedDocument document : replication.getDocuments()) {
            Log.i(TAG, "Doc ID: " + document.getID());
    
            CouchbaseLiteException err = document.getError();
            if (err != null) {
                // There was an error
                Log.e(TAG, "Error replicating document: ", err);
                return;
            }
    
            if (document.flags().contains(DocumentFlag.DocumentFlagsDeleted)) {
                Log.i(TAG, "Successfully replicated a deleted document");
            }
        }
    });
    
    replicator.start();

    The following example stops the change listener with the token from the previous example.

    replicator.removeChangeListener(token);

    Document Access Removal Behavior

    When access to a document is removed on Sync Gateway, the document replication listener sends a notification with the AccessRemoved flag set to true and subsequently purges the document from the database.

    Custom Headers

    Custom headers can be set on the configuration object. And the replicator will send those header(s) in every request. As an example, this feature can be useful to pass additional credentials when there is an authentication or authorization step being done by a proxy server (between Couchbase Lite and Sync Gateway).

    ReplicatorConfiguration config = new ReplicatorConfiguration(database, endpoint);
    Map<String, String> headers = new HashMap<>();
    headers.put("CustomHeaderName", "Value");
    config.setHeaders(headers);

    Replication Checkpoint Reset

    Replicators use checkpoints to keep track of documents sent to the target database. Without checkpoints, Couchbase Lite would replicate the entire database content to the target database on each connection, even though previous replications may already have replicated some or all of that content.

    This functionality is generally not a concern to application developers. However, if you do want to force the replication to start again from zero, use the checkpoint reset method replicator.resetCheckpoint() before starting the replicator.

    replicator.resetCheckpoint();
    replicator.start();

    Replication Filters

    Replication Filters allow you to have quick control over which documents are stored as the result of a push and/or pull replication.

    Push Filter

    A push filter allows an app to push a subset of a database to the server, which can be very useful in some circumstances. For instance, high-priority documents could be pushed first, or documents in a "draft" state could be skipped.

    The following example filters out documents whose type property is equal to draft.

    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setPushFilter((document, flags) -> flags.contains(DocumentFlag.DocumentFlagsDeleted)); (1)
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();
    1 The callback should follow the semantics of a pure function. Otherwise, long running functions would slow down the replicator considerably. Furthermore, your callback should not make assumptions about what thread it is being called on.

    Pull Filter

    A pull filter gives an app the ability to validate documents being pulled, and skip ones that fail. This is an important security mechanism in a peer-to-peer topology with peers that are not fully trusted.

    Pull replication filters are not a substitute for channels. Sync Gateway channels are designed to be scalable (documents are filtered on the server) whereas a pull replication filter is applied to a document once it has been downloaded.
    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setPullFilter((document, flags) -> "draft".equals(document.getString("type"))); (1)
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();
    1 The callback should follow the semantics of a pure function. Otherwise, long running functions would slow down the replicator considerably. Furthermore, your callback should not make assumptions about what thread it is being called on.
    Losing access to a document (via the Sync Function) also triggers the pull replication filter. Filtering out such an event would retain the document locally. As a result, there would be a local copy of the document disjointed from the one that resides on Couchbase Server. Further updates to the document stored on Couchbase Server would not be received in pull replications and further local edits could be potentially pushed, which would result in 409 errors since access has been revoked.