Replication

    +

    Description — Couchbase Lite database replication and synchronization
    Related Content — Conflicts | Inter-Database Replication | Certificate Pinning |

    Couchbase Mobile uses a replication protocol based on WebSockets.

    The replicator is designed to send documents from a source to a target database. The target can be one of the following:

    URLEndpoint

    To replicate data between a local Couchbase Lite database and remote Sync Gateway database or passive peer listener.

    DatabaseEndpoint

    To replicate data between two local Couchbase Lite databases to store data on secondary storage.

    MessageEndpoint

    To replicate with another Couchbase Lite database via a custom transportation protocol such iOS Multipeer Connectivity, Android WiFi Direct, Android NearByConnection, socket based transportation etc.

    Compatibility

    The new protocol is incompatible with CouchDB-based databases. And since Couchbase Lite 2 only supports the new protocol, you will need to run a version of Sync Gateway that supports it — see: Sync Gateway Compatibility Matrix.

    To use this protocol with Couchbase Lite 2.0, the replication URL should specify WebSockets as the URL scheme (see the Starting a Replication section below). Mobile clients using Couchbase Lite 1.x can continue to use http as the URL scheme. Sync Gateway 2.0 will automatically use the 1.x replication protocol when a Couchbase Lite 1.x client connects through http://localhost:4984/db and the 2.0 replication protocol when a Couchbase Lite 2.0 client connects through "ws://localhost:4984/db".

    Starting Sync Gateway

    Download Sync Gateway and start it from the command line with the configuration file created above.

    ~/Downloads/couchbase-sync-gateway/bin/sync_gateway

    For platform specific installation instructions, refer to the Sync Gateway installation guide.

    Starting a Replication

    Replication can be bidirectional, this means you can start a push/pull replication with a single instance. The replication’s parameters can be specified through the ReplicatorConfiguration object; for example, if you wish to start a push only or pull only replication.

    Example 1. Pull replication

    This example creates a pull replication with Sync Gateway.

    class MyClass {
        Database database;
        Replicator replicator; (1)
    
        void startReplication() {
            URI uri = null;
            try {
                uri = new URI("wss://10.0.2.2:4984/db"); (2)
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            Endpoint endpoint = new URLEndpoint(uri);
            ReplicatorConfiguration config = new ReplicatorConfiguration(database, endpoint);
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PULL);
            this.replicator = new Replicator(config);
            this.replicator.start();
        }
    
    }
    1 A replication is an asynchronous operation. To keep a reference to the replicator object, you can set it as an instance property.
    2 The URL scheme for remote database URLs has changed in Couchbase Lite 2.0. You should now use ws:, or wss: for SSL/TLS connections. In this example the hostname is 10.0.2.2.

    Verify the Replication

    To verify that documents have been replicated, you can:

    • Monitor the Sync Gateway sequence number returned by the database endpoint (GET /{tkn-db}/). The sequence number increments for every change that happens on the Sync Gateway database.

    • Query a document by ID on the Sync Gateway REST API (GET /{tkn-db}/{id}).

    • Query a document from the Query Workbench on the Couchbase Server Console.

    Couchbase Lite 2.0 uses WebSockets as the communication protocol to transmit data. Some load balancers are not configured for WebSocket connections by default (NGINX for example); so it might be necessary to explicitly enable them in the load balancer’s configuration (see Load Balancers).

    By default, the WebSocket protocol uses compression to optimize for speed and bandwidth utilization. The level of compression is set on Sync Gateway and can be tuned in the configuration file (replicator_compression).

    Replication Ordering

    To optimize for speed, the replication protocol doesn’t guarantee that documents will be received in a particular order. So do not rely receiving replicated data in a specific sequence when using the replication or database change listeners for example.

    Replicator Notifications on a Custom Executor

    Prior to version 2.6, Couchbase Lite spun up multiple executors. This policy could result in too many threads being spun up.

    An executor manages a pool of threads and, perhaps, a queue in front of the executor, to handle the asynchronous callbacks. Couchbase Lite API calls which are processed by an executor are listed below.

    Query.addChangeListener
    MessageEndpointListerner.addChangeListener
    LiveQuery.addChangeListener
    AbstractReplicator.addDocumentReplicationListener
    AbstractReplicator.addChangeListener
    Database.addChangeListener
    Database.addDocumentChangeListener
    Database.addDatabaseChangeListener
    Database.addChangeListener

    As of version 2.6, Couchbase sometimes uses its own internal executor to run asynchronous client code. While this is fine for small tasks, larger tasks — those that take significant compute time, or that perform I/O — can block Couchbase processing. If this happens your application will fail with a RejectedExecutionException and it may be necessary to create a separate executor on which to run the large tasks.

    The following examples show how to specify a separate executor in the client code. The client code executor can enforce an application policy for delivery ordering and the number of threads.

    Guaranteed Order Delivery

    /**
     * This version guarantees in order delivery and is parsimonious with space
     * The listener does not need to be thread safe (at least as far as this code is concerned).
     * It will run on only thread (the Executor's thread) and must return from a given call
     * before the next call commences.  Events may be delivered arbitrarily late, though,
     * depending on how long it takes the listener to run.
     */
    public class InOrderExample {
        private static final ExecutorService IN_ORDER_EXEC = Executors.newSingleThreadExecutor();
    
        public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
            throws CouchbaseLiteException {
            ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
            config.setContinuous(false);
    
            Replicator repl = new Replicator(config);
            ListenerToken token = repl.addChangeListener(IN_ORDER_EXEC, listener::changed);
    
            repl.start();
    
            return repl;
        }
    }

    Maximum Throughput

    /**
     * This version maximizes throughput.  It will deliver change notifications as quickly
     * as CPU availability allows. It may deliver change notifications out of order.
     * Listeners must be thread safe because they may be called from multiple threads.
     * In fact, they must be re-entrant because a given listener may be running on mutiple threads
     * simultaneously.  In addition, when notifications swamp the processors, notifications awaiting
     * a processor will be queued as Threads, (instead of as Runnables) with accompanying memory
     * and GC impact.
     */
    public class MaxThroughputExample {
        private static final ExecutorService MAX_THROUGHPUT_EXEC = Executors.newCachedThreadPool();
    
        public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
            throws CouchbaseLiteException {
            ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
            config.setContinuous(false);
    
            Replicator repl = new Replicator(config);
            ListenerToken token = repl.addChangeListener(MAX_THROUGHPUT_EXEC, listener::changed);
    
            repl.start();
    
            return repl;
        }
    }

    Extreme Configurability

    /**
     * This version demonstrates the extreme configurability of the CouchBase Lite replicator callback system.
     * It may deliver updates out of order and does require thread-safe and re-entrant listeners
     * (though it does correctly synchronizes tasks passed to it using a SynchronousQueue).
     * The thread pool executor shown here is configured for the sweet spot for number of threads per CPU.
     * In a real system, this single executor might be used by the entire application and be passed to
     * this module, thus establishing a reasonable app-wide threading policy.
     * In an emergency (Rejected Execution) it lazily creates a backup executor with an unbounded queue
     * in front of it.  It, thus, may deliver notifications late, as well as out of order.
     */
    public class PolicyExample {
        private static final int CPUS = Runtime.getRuntime().availableProcessors();
    
        private static ThreadPoolExecutor BACKUP_EXEC;
    
        private static final RejectedExecutionHandler BACKUP_EXECUTION
            = new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                synchronized (this) {
                    if (BACKUP_EXEC == null) { BACKUP_EXEC = createBackupExecutor(); }
                }
                BACKUP_EXEC.execute(r);
            }
        };
    
        private static ThreadPoolExecutor createBackupExecutor() {
            ThreadPoolExecutor exec
                = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            exec.allowCoreThreadTimeOut(true);
            return exec;
        }
    
        private static final ThreadPoolExecutor STANDARD_EXEC
            = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    
        static { STANDARD_EXEC.setRejectedExecutionHandler(BACKUP_EXECUTION); }
    
        public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
            throws CouchbaseLiteException {
            ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
            config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
            config.setContinuous(false);
    
            Replicator repl = new Replicator(config);
            ListenerToken token = repl.addChangeListener(STANDARD_EXEC, listener::changed);
    
            repl.start();
    
            return repl;
        }
    }

    Troubleshooting

    As always, when there is a problem with replication, logging is your friend. The following example increases the log output for activity related to replication with Sync Gateway.

    Database.setLogLevel(LogDomain.REPLICATOR, LogLevel.VERBOSE);

    Authentication

    By default, Sync Gateway does not enable authentication. This is to make it easier to get up and running with synchronization. You can enable authentication with the following properties in the configuration file:

    {
      "databases": {
        "mydatabase": {
          "users": {
            "GUEST": {"disabled": true}
          }
        }
      }
    }

    To authenticate with Sync Gateway, an associated user must first be created. Sync Gateway users can be created through the POST /{tkn-db}/_user endpoint on the Admin REST API. Provided that the user exists on Sync Gateway, there are two ways to authenticate from a Couchbase Lite client: Basic Authentication or Session Authentication.

    Basic Authentication

    You can provide a user name and password to the basic authenticator class method. Under the hood, the replicator will send the credentials in the first request to retrieve a SyncGatewaySession cookie and use it for all subsequent requests during the replication. This is the recommended way of using basic authentication. The following example initiates a one-shot replication as the user username with the password password.

    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setAuthenticator(new BasicAuthenticator("username", "password"));
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();

    Session Authentication

    Session authentication is another way to authenticate with Sync Gateway. A user session must first be created through the POST /{tkn-db}/_session endpoint on the Public REST API. The HTTP response contains a session ID which can then be used to authenticate as the user it was created for. The following example initiates a one-shot replication with the session ID that is returned from the POST /{tkn-db}/_session endpoint.

    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setAuthenticator(new SessionAuthenticator("904ac010862f37c8dd99015a33ab5a3565fd8447"));
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();

    Replication Status

    The replication.Status.Activity property can be used to check the status of a replication. For example, when the replication is actively transferring data and when it has stopped.

    replicator.addChangeListener(change -> {
        if (change.getStatus().getActivityLevel() == Replicator.ActivityLevel.STOPPED) {
            Log.i(TAG, "Replication stopped");
        }
    });

    The following table lists the different activity levels in the API and the meaning of each one.

    State Meaning

    STOPPED

    The replication is finished or hit a fatal error.

    OFFLINE

    The replicator is offline as the remote host is unreachable.

    CONNECTING

    The replicator is connecting to the remote host.

    IDLE

    The replication caught up with all the changes available from the server. The IDLE state is only used in continuous replications.

    BUSY

    The replication is actively transferring data.

    The replication change object also has properties to track the progress (change.status.completed and change.status.total). But since the replication occurs in batches and the total count can vary through the course of a replication, those progress indicators are not very useful from the standpoint of an app user. Hence, these should not be used for tracking the actual progress of the replication.

    Handling Network Errors

    When replicator detects a network error it updates its status depending on the error type (permanent or temporary) and returns an appropriate HTTP error code.

    The following code snippet adds a Change Listener, which monitors a replication for errors and logs the the returned error code.

    Monitoring for network errors
    replicator.addChangeListener(change -> {
        CouchbaseLiteException error = change.getStatus().getError();
        if (error != null) { Log.w(TAG, "Error code:: %d", error); }
    });
    replicator.start();

    For permanent network errors (for example, 404 not found, or 401 unauthorized): Replicator will stop permanently, whether setContinuous is true or false. Of course, it sets its status to STOPPED

    For recoverable or temporary errors: Replicator sets its status to OFFLINE, then:

    • If setContinuous=true it retries the connection indefinitely

    • If setContinuous=false (one-shot) it retries the connection a limited number of times.

    The following error codes are considered temporary by the Couchbase Lite replicator and thus will trigger a connection retry.

    • 408: Request Timeout

    • 429: Too Many Requests

    • 500: Internal Server Error

    • 502: Bad Gateway

    • 503: Service Unavailable

    • 504: Gateway Timeout

    • 1001: DNS resolution error

    Replication Events

    You can choose to register for document updates during a replication.

    For example, the code snippet below registers a listener to monitor document replication performed by the replicator referenced by the variable replicator. It prints the document ID of each document received and sent.

    ListenerToken token = replicator.addDocumentReplicationListener(replication -> {
    
        Log.i(TAG, "Replication type: " + ((replication.isPush()) ? "Push" : "Pull"));
        for (ReplicatedDocument document : replication.getDocuments()) {
            Log.i(TAG, "Doc ID: " + document.getID());
    
            CouchbaseLiteException err = document.getError();
            if (err != null) {
                // There was an error
                Log.e(TAG, "Error replicating document: ", err);
                return;
            }
    
            if (document.flags().contains(DocumentFlag.DocumentFlagsDeleted)) {
                Log.i(TAG, "Successfully replicated a deleted document");
            }
        }
    });
    
    replicator.start();

    The following example stops the change listener with the token from the previous example.

    replicator.removeChangeListener(token);

    Document Access Removal Behavior

    When access to a document is removed on Sync Gateway, the document replication listener sends a notification with the AccessRemoved flag set to true and subsequently purges the document from the database.

    Custom Headers

    Custom headers can be set on the configuration object. And the replicator will send those header(s) in every request. As an example, this feature can be useful to pass additional credentials when there is an authentication or authorization step being done by a proxy server (between Couchbase Lite and Sync Gateway).

    ReplicatorConfiguration config = new ReplicatorConfiguration(database, endpoint);
    Map<String, String> headers = new HashMap<>();
    headers.put("CustomHeaderName", "Value");
    config.setHeaders(headers);

    Replication Checkpoint Reset

    Replicators use checkpoints to keep track of documents sent to the target database. Without checkpoints, Couchbase Lite would replicate the entire database content to the target database on each connection, even though previous replications may already have replicated some or all of that content.

    This functionality is generally not a concern to application developers. However, if you do want to force the replication to start again from zero, use the checkpoint reset method replicator.resetCheckpoint() before starting the replicator.

    replicator.resetCheckpoint();
    replicator.start();

    Replication Filters

    Replication Filters allow you to have quick control over which documents are stored as the result of a push and/or pull replication.

    Push Filter

    A push filter allows an app to push a subset of a database to the server, which can be very useful in some circumstances. For instance, high-priority documents could be pushed first, or documents in a "draft" state could be skipped.

    The following example filters out documents whose type property is equal to draft.

    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setPushFilter((document, flags) -> flags.contains(DocumentFlag.DocumentFlagsDeleted)); (1)
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();
    1 The callback should follow the semantics of a pure function. Otherwise, long running functions would slow down the replicator considerably. Furthermore, your callback should not make assumptions about what thread it is being called on.

    Pull Filter

    A pull filter gives an app the ability to validate documents being pulled, and skip ones that fail. This is an important security mechanism in a peer-to-peer topology with peers that are not fully trusted.

    Pull replication filters are not a substitute for channels. Sync Gateway channels are designed to be scalable (documents are filtered on the server) whereas a pull replication filter is applied to a document once it has been downloaded.
    URLEndpoint target = new URLEndpoint(new URI("ws://localhost:4984/mydatabase"));
    
    ReplicatorConfiguration config = new ReplicatorConfiguration(database, target);
    config.setPullFilter((document, flags) -> "draft".equals(document.getString("type"))); (1)
    
    // Create replicator (be sure to hold a reference somewhere that will prevent the Replicator from being GCed)
    replicator = new Replicator(config);
    replicator.start();
    1 The callback should follow the semantics of a pure function. Otherwise, long running functions would slow down the replicator considerably. Furthermore, your callback should not make assumptions about what thread it is being called on.
    Losing access to a document (via the Sync Function) also triggers the pull replication filter. Filtering out such an event would retain the document locally. As a result, there would be a local copy of the document disjointed from the one that resides on Couchbase Server. Further updates to the document stored on Couchbase Server would not be received in pull replications and further local edits could be potentially pushed, which would result in 409 errors since access has been revoked.