Description — Couchbase Lite’s Peer-to-Peer Synchronization enables edge devices to synchronize securely without consuming centralized cloud-server resources
Abstract — How to set up a Replicator to connect with a Listener and replicate changes using peer-to-peer sync
Related Content — API Reference | Passive Peer | Active Peer
Enterprise Edition only
This an Enterprise Edition feature.
Purchase the Enterprise License, which includes official Couchbase Support, to use it in production (see the license and support https://www.couchbase.com/licensing-and-support-faq).
|
Code Snippets
The code examples are indicative only.
They demonstrate basic concepts and approaches to using a feature.
Use them as inspiration and adapt these examples to best practice when developing applications for your platform.
|
Introduction
This content provides sample code and configuration examples covering the implementation of Peer-to-Peer Sync over websockets. Specifically it covers the implementation of an Active Peer.
This active peer (also referred to as a client and-or a replicator) will initiate connection with a Passive Peer (also referred to as a server and-or listener) and participate in the replication of database changes to bring both databases into sync.
Subsequent sections provide additional details and examples for the main configuration options.
Secure Storage
The use of TLS, its associated keys and certificates requires using secure storage to minimize the chances of a security breach.
The implementation of this storage differs from platform to platform — see Using secure storage.
|
Configuration Summary
You should configure and initialize a replicator for each Couchbase Lite database instance you want to sync. Example 1 shows the initialization and configuration process.
let validUser = "syncuser"
let validPassword = "sync9455"
let cert:SecCertificate?
let passivePeerEndpoint = "10.1.1.12:8920"
let passivePeerPort = "8920"
let passiveDbName = "userdb"
var actDb:Database?
var thisReplicator:Replicator?
var replicatorListener:ListenerToken?
let tgtUrl = URL(string: "wss://10.1.1.12:8092/actDb")!
let targetEndpoint = URLEndpoint(url: tgtUrl)
var config = ReplicatorConfiguration(database: actDb!, target: targetEndpoint) (1)
config.replicatorType = .pushAndPull
// Configure Sync Mode
config.continuous = true
// Configure Server Security -- only accept self-signed certs
config.acceptOnlySelfSignedServerCertificate = true; (2)
// Configure Client Security (3)
// Set Authentication Mode
let thisAuthenticator = BasicAuthenticator(username: "Our Username", password: "Our Password")
config.authenticator = thisAuthenticator
/* Optionally set custom conflict resolver call back */
config.conflictResolver = ( /* define resolver function */); (4)
// Apply configuration settings to the replicator
thisReplicator = Replicator.init( config: config) (5)
// Optionally add a change listener
// Retain token for use in deletion
let pushPullReplListener:ListenerToken? = thisReplicator?.addChangeListener({ (change) in (6)
if change.status.activity == .stopped {
print("Replication stopped")
}
else {
print("Replicator is currently ", thisReplicator?.status.activity)
}
})
// Run the replicator using the config settings
thisReplicator?.start() (7)
Notes on Example
1 | Use the ReplicatorConfiguration class’s constructor — init(database:, target:) — to initialize the replicator configuration with the local database — see also: [configure-target] |
2 | Configure how the client will authenticate the server. Here we say connect only to servers presenting a self-signed certificate. By default, clients accept only servers presenting certificates that can be verified using the OS bundled Root CA Certificates — see: [authenticating-the-listener]. |
3 | Configure the credentials the client will present to the server. Here we say to provide Basic Authentication credentials. Other options are available — see: [client-authentication]. |
4 | Configure how the replication should perform Conflict Resolution. |
5 | Initialize the replicator using your configuration object. |
6 | Register an observer, which will notify you of changes to the replication status. |
7 | Start the replicator. |
API References
You can find Swift API References here.
Device Discovery
This phase is optional: If the listener is initialized on a well known URL endpoint (for example, a static IP Address or well known DNS address) then you can configure active peers to connect to those.
Prior to connecting with a listener you may execute a peer discovery phase to dynamically discover peers.
For the active peer this involves browsing-for and selecting the appropriate service using a zero-config protocol such as Bonjour-- see: https://developer.apple.com/bonjour/.
Configure Replicator
- In this section
-
Configure Target | Sync Mode | Heartbeat | Authenticating the Listener | Client Authentication
Configure Target
Use the ReplicatorConfiguration class and init(database:, target:) constructor to initialize the replication configuration with local and remote database locations.
The constructor provides:
-
the name of the local database to be sync’d
-
the server’s URL (including the port number and the name of the remote database to sync with)
It is expected that the app will identify the IP address and URL and append the remote database name to the URL endpoint, producing for example:
wss://10.0.2.2:4984/travel-sample
The URL scheme for web socket URLs uses
ws:
(non-TLS) orwss:
(SSL/TLS) prefixes.
let tgtUrl = URL(string: "wss://10.1.1.12:8092/travel-sample")!
let targetEndpoint = URLEndpoint(url: tgtUrl)
var config = ReplicatorConfiguration(database: actDb!, target: targetEndpoint) (1)
Notes on Example
1 | Note use of the wss:// prefix to ensure TLS encryption (strongly recommended in production) |
Sync Mode
Here we define the direction and type of replication we want to initiate.
We use ReplicatorConfiguration
class’s replicatorType and
continuous
parameters, to tell the replicator:
-
The direction of the replication:
pushAndPull
;pull
;push
-
The type of replication, that is:
-
Continuous — remaining active indefinitely to replicate changed documents (
continuous=true
). -
Ad-hoc — a one-shot replication of changed documents (
continuous=false
).
-
config.replicatorType = .pushAndPull
// Configure Sync Mode
config.continuous = true
Heartbeat
A point to consider when initiating a replication, particularly a continuous replication, is keeping the connection alive. Couchbase Lite minimizes the chance of dropped connections by having the replicator maintain a heartbeat; essentially pinging the listener at a configurable interval.
When necessary you can adjust this interval using heartbeat() as shown in — Example 4.
The default heartbeat value is 300 (5 minutes).
let target =
URLEndpoint(url: URL(string: "ws://foo.couchbase.com/db")!)
let config = ReplicatorConfiguration(database: database, target: targetDatabase)
config.type = .pushAndPull
config.continuous = true
config.heartbeat = 60 (1)
repl = Replicator(config: config)
1 | The heartbeat value sets the interval (in seconds) between the heartbeat pulses. |
Authenticating the Listener
Define the credentials the your app (the client) is expecting to receive from the server (listener) in order to ensure that the server is one it is prepared to interact with.
Note that the client cannot authenticate the server if TLS is turned off. When TLS is enabled (Sync Gateway’s default) the client must authenticate the server. If the server cannot provide acceptable credentials then the connection will fail.
Use ReplicatorConfiguration
properties acceptOnlySelfSignedServerCertificate and setPinnedServerCertificate(), to tell the replicator how to verify server-supplied TLS server certificates.
-
If there is a pinned certificate, nothing else matters, the server cert must exactly match the pinned certificate.
-
If there are no pinned certs and acceptOnlySelfSignedServerCertificate is
true
then any self-signed certificate is accepted. Certificates that are not self signed are rejected, no matter who signed them. -
If there are no pinned certificates and acceptOnlySelfSignedServerCertificate is
false
(default), the client validates the server’s certificates against the system CA certificates. The server must supply a chain of certificates whose root is signed by one of the certificates in the system CA bundle.
-
CA Cert
-
Self Signed Cert
-
Pinned Certificate
Set the client to expect and accept only CA attested certificates.
// Configure Server Security -- only accept CA Certs
config.acceptOnlySelfSignedServerCertificate = false (1)
Notes on Example
1 | This is the default. Only certificate chains with roots signed by a trusted CA are allowed. Self signed certificates are not allowed. |
Set the client to expect and accept only self-signed certificates
// Configure Server Security -- only accept self-signed certs
config.acceptOnlySelfSignedServerCertificate = true; (1)
// Configure Server Security -- only accept self-signed certs
config.acceptOnlySelfSignedServerCertificate = true; (2)
Notes on Example
1 | Set this to true to accept any self signed cert.
Any certificates that are not self-signed are rejected. |
Set the client to expect and accept only a pinned certificate.
// Get bundled resource and read into localcert
guard let pathToCert = Bundle.main.path(forResource: "listener-pinned-cert", ofType: "cer")
else { /* process error */ }
guard let localCertificate:NSData =
NSData(contentsOfFile: pathToCert)
else { /* process error */ }
// Create certificate
// using its DER representation as a CFData
guard let pinnedCert = SecCertificateCreateWithData(nil, localCertificate)
else { /* process error */ }
// Add `pinnedCert` and `acceptOnlySelfSignedServerCertificate=false` to `ReplicatorConfiguration`
config.acceptOnlySelfSignedServerCertificate = false
config.pinnedServerCertificate = pinnedCert
Client Authentication
Here we define the credentials that the client can present to the server if prompted to do so in order that the server can authenticate it.
We use ReplicatorConfiguration's authenticator method to define the authentication method to the replicator - see Example 7.
Basic Authentication
Use the BasicAuthenticator
to supply basic authentication credentials (username and password).
This example shows basic authentication using user name and password:
// Set Authentication Mode
let thisAuthenticator = BasicAuthenticator(username: "Our Username", password: "Our Password")
config.authenticator = thisAuthenticator
// Set Authentication Mode
let thisAuthenticator = BasicAuthenticator(username: validUser, password: validPassword)
config.authenticator = thisAuthenticator
Certificate Authentication
Use the ClientCertificateAuthenticator
to configure the client TLS certificates to be presented to the server, on connection.
This applies only to the URLEndpointListener.
The server (listener) must have disableTLS set false and have a ClientCertificateAuthenticator configured, or it will never ask for this client’s certificate.
|
The certificate to be presented to the server will need to be signed by the root certificates or be valid based on the authentication callback set to the listener via ListenerCertificateAuthenticator.
This example shows client certificate authentication using an identity from secure storage.
// Check if Id exists in keychain and if so, use that Id
if let thisIdentity =
(try? TLSIdentity.identity(withLabel: "doco-sync-server")) ?? nil { (1)
print("An identity with label : doco-sync-server already exists in keychain")
thisAuthenticator = ClientCertificateAuthenticator(identity: thisIdentity) (2)
config.authenticator = thisAuthenticator
}
Notes on Example
1 | Get an identity from secure storage and create a TLS Identity object |
2 | Set the authenticator to ClientCertificateAuthenticator and configure it to use the retrieved identity |
Initialize Replicator
Use the Replicator
class’s init(config:) constructor, to initialize the replicator with the configuration you have defined.
You can, optionally, add a change listener (see Monitor Sync) before starting the replicator running using start().
// Apply configuration settings to the replicator
thisReplicator = Replicator.init( config: config) (1)
// Run the replicator using the config settings
thisReplicator?.start() (2)
Notes on Example
1 | Initialize the replicator with the configuration |
2 | Start the replicator |
Monitor Sync
- In this section
-
Change Listeners | Replicator Status | Documents Pending Push
- In this section
-
Change Listeners | Replicator Status | [lbl-repl-evnts] | Documents Pending Push
You can monitor a replication’s status by using a combination of Change Listeners and the replication.status.activity
property — see; Activity.
This enables you to know, for example, when the replication is actively transferring data and when it has stopped.
You can also choose to monitor document changes — see: [lbl-repl-evnts].
Change Listeners
Use this to monitor changes and to inform on sync progress; this is an optional step.
Best Practice
You should register the listener before starting your replication, to avoid having to do a restart to activate it … and don’t forget to save the token so you can remove the listener later
|
Use the Replicator class to add a change listener as a callback to the Replicator (addChangeListener(_:)) — see: Example 9. You will then be asynchronously notified of state changes.
Remove your change listener before stopping the replicator — use the removeChangeListener(withToken:) method to do this.
Replicator Status
You can use the Replicator class’s Status property to check the replicator status. That is, whether it is actively transferring data or if it has stopped — see: Example 9.
The returned ReplicationStatus structure comprises:
-
Adding a Change Listener
-
Using replicator.status
// Optionally add a change listener
// Retain token for use in deletion
let pushPullReplListener:ListenerToken? = thisReplicator?.addChangeListener({ (change) in (1)
if change.status.activity == .stopped {
print("Replication stopped")
}
else {
print("Replicator is currently ", thisReplicator?.status.activity)
}
})
print("Replicator is currently ", thisReplicator?.status.activity)
}
})
Replication States
Table 1 shows the different states, or activity levels, reported in the API; and the meaning of each.
State |
Meaning |
---|---|
|
The replication is finished or hit a fatal error. |
|
The replicator is offline as the remote host is unreachable. |
|
The replicator is connecting to the remote host. |
|
The replication caught up with all the changes available from the server.
The |
|
The replication is actively transferring data. |
The replication change object also has properties to track the progress (change.status.completed and change.status.total ).
Since the replication occurs in batches the total count can vary through the course of a replication.
|
Replication Status and App Life Cycle
The following diagram describes the status changes when the application starts a replication, and when the application is being backgrounded or foregrounded by the OS. It applies to iOS only.
Additionally, on iOS, an app already in the background may be terminated.
In this case, the Database
and Replicator
instances will be null
when the app returns to the foreground.
Therefore, as preventive measure, it is recommended to do a null
check when the app enters the foreground, and to re-initialize the database and replicator if any of those is null
.
On other platforms, Couchbase Lite doesn’t react to OS backgrounding or foregrounding events and replication(s) will continue running as long as the remote system does not terminate the connection and the app does not terminate. It is generally recommended to stop replications before going into the background otherwise socket connections may be closed by the OS and this may interfere with the replication process.
Documents Pending Push
Replicator.isDocumentPending() is quicker and more efficient. Use it in preference to returning a list of pending document IDs, where possible. |
You can check whether documents are waiting to be pushed in any forthcoming sync by using either of the following API methods:
-
Use the Replicator.pendingDocumentIds() method, which returns a list of document IDs that have local changes, but which have not yet been pushed to the server.
This can be very useful in tracking the progress of a push sync, enabling the app to provide a visual indicator to the end user on its status, or decide when it is safe to exit.
-
Use the Replicator.isDocumentPending() method to quickly check whether an individual document is pending a push.
let url = URL(string: "ws://localhost:4984/mydatabase")!
let target = URLEndpoint(url: url)
let config = ReplicatorConfiguration(database: database, target: target)
config.replicatorType = .push
self.replicator = Replicator(config: config)
let mydocids:Set = self.replicator.pendingDocumentIds() (1)
if(!mydocids.isEmpty) {
print("There are \(mydocids.count) documents pending")
self.replicator.addChangeListener { (change) in
print("Replicator activity level is \(change.status.activity.toString())")
// iterate and report-on previously
// retrieved pending docids 'list'
for thisId in mydocids.sorted() {
if(!self.replicator.isDocumentPending(thisid)) { (2)
print("Doc ID \(thisId) now pushed")
}
}
}
self.replicator.start()
1 | Replicator.pendingDocumentIds() returns a list of the document IDs for all documents waiting to be pushed. This is a snapshot and may have changed by the time the response is received and processed. |
2 | Replicator.isDocumentPending() returns true if the document is waiting to be pushed, and false otherwise. |
Stop Sync
Stopping a replication is straightforward. It is done using stop(). This initiates an asynchronous operation and so is not necessarily immediate. Your app should account for this potential delay before attempting any subsequent operations, for example closing the database.
You can find further information on database operations in Databases.
Best Practice
|
// Remove the change listener
thisReplicator?.removeChangeListener(withToken: pushPullReplListener)
// Stop the replicator
thisReplicator?.stop()
1 | Here we initiate the stopping of the replication using the stop() method. We can then remove any active change listener. |
Conflict Resolution
Unless you specify otherwise, Couchbase Lite’s default conflict resolution policy is applied — see Automatic Conflict Resolution.
To use a different policy, specify a conflict resolver using conflictResolver as shown in Example 12.
For more complex solutions you can provide a custom conflict resolver - see: Custom Conflict Resolution.
-
Local Wins
-
Remote Wins
-
Merge
class LocalWinConflictResolver: ConflictResolverProtocol {
func resolve(conflict: Conflict) -> Document? {
return conflict.localDocument
}
}
class RemoteWinConflictResolver: ConflictResolverProtocol {
func resolve(conflict: Conflict) -> Document? {
return conflict.remoteDocument
}
}
class MergeConflictResolver: ConflictResolverProtocol {
func resolve(conflict: Conflict) -> Document? {
let localDict = conflict.localDocument!.toDictionary()
let remoteDict = conflict.remoteDocument!.toDictionary()
let result = localDict.merging(remoteDict) { (current, new) -> Any in
return current // return current value in case of duplicate keys
}
return MutableDocument(id: conflict.documentID, data: result)
}
}
Just as a replicator may observe a conflict — when updating a document that has changed both in the local database and in a remote database — any attempt to save a document may also observe a conflict, if a replication has taken place since the local app retrieved the document from the database. To address that possibility, a version of the Database.save()
method also takes a conflict resolver as shown in [merging-document-properties].
The following code snippet shows an example of merging properties from the existing document (current
) into the one being saved (new
).
In the event of conflicting keys, it will pick the key value from new
.
guard let document = database.document(withID: "xyz") else { return }
let mutableDocument = document.toMutable()
mutableDocument.setString("apples", forKey: "name")
try database.saveDocument(mutableDocument, conflictHandler: { (new, current) -> Bool in
let currentDict = current!.toDictionary()
let newDict = new.toDictionary()
let result = newDict.merging(currentDict, uniquingKeysWith: { (first, _) in first })
new.setData(result)
return true
})