Sink Configuration Options
Reference of the sink connector options.
Connection
couchbase.seed.nodes
Addresses of Couchbase Server nodes, delimited by commas.
If a custom port is specified, it must be the KV port (which is normally 11210 for insecure connections, or 11207 for secure connections).
- 
Type: list 
- 
Importance: high 
couchbase.password
Password of the Couchbase user.
May be overridden with the KAFKA_COUCHBASE_PASSWORD environment variable.
- 
Type: password 
- 
Importance: high 
couchbase.bucket
Name of the Couchbase bucket to use.
This property is required unless using the experimental AnalyticsSinkHandler.
- 
Type: string 
- 
Default: ""
- 
Importance: high 
couchbase.network
The network selection strategy for connecting to a Couchbase Server cluster that advertises alternate addresses.
A Couchbase node running inside a container environment (like Docker or Kubernetes) might be configured to advertise both its address within the container environment (known as its "default" address) as well as an "external" address for use by clients connecting from outside the environment.
Setting the 'couchbase.network' config property to 'default' or 'external' forces the selection of the respective addresses. Setting the value to 'auto' tells the connector to select whichever network contains the addresses specified in the 'couchbase.seed.nodes' config property.
- 
Type: string 
- 
Default: auto
- 
Importance: medium 
couchbase.bootstrap.timeout
On startup, the connector will wait this long for a Couchbase connection to be established. If a connection is not established before the timeout expires, the connector will terminate.
- 
Type: string 
- 
Default: 30s
- 
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m 
- 
Importance: medium 
Security
couchbase.enable.tls
Use secure connection to Couchbase Server.
If true, you must also tell the connector which certificate to trust. Specify a certificate file with 'couchbase.trust.certificate.path', or a Java keystore file with 'couchbase.trust.store.path' and 'couchbase.trust.store.password'.
- 
Type: boolean 
- 
Default: false
- 
Importance: medium 
- 
Dependents: couchbase.trust.certificate.pathcouchbase.trust.store.pathcouchbase.trust.store.passwordcouchbase.enable.hostname.verificationcouchbase.client.certificate.pathcouchbase.client.certificate.password
couchbase.enable.hostname.verification
Set this to false to disable TLS hostname verification for Couchbase connections.
Less secure, but might be required if for some reason you can’t issue a certificate whose Subject Alternative Names match the hostname used to connect to the server.
Only disable if you understand the impact and can accept the risks.
- 
Type: boolean 
- 
Default: true
- 
Importance: medium 
couchbase.trust.store.path
Absolute filesystem path to the Java keystore holding the CA certificate used by Couchbase Server.
If you want to use a PEM file instead of a Java keystore, specify couchbase.trust.certificate.path instead.
- 
Type: string 
- 
Default: ""
- 
Importance: medium 
couchbase.trust.store.password
Password for accessing the trust store.
May be overridden with the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable.
- 
Type: password 
- 
Default: [hidden]
- 
Importance: medium 
couchbase.trust.certificate.path
Absolute filesystem path to the PEM file containing the CA certificate used by Couchbase Server.
If you want to use a Java keystore instead of a PEM file, specify couchbase.trust.store.path instead.
- 
Type: string 
- 
Default: ""
- 
Importance: medium 
couchbase.client.certificate.path
Absolute filesystem path to a Java keystore or PKCS12 bundle holding the private key and certificate chain to use for client certificate authentication (mutual TLS).
If you supply a value for this config property, the couchbase.username and couchbase.password properties will be ignored.
- 
Type: string 
- 
Default: ""
- 
Importance: medium 
Logging
couchbase.log.redaction
Determines which kinds of sensitive log messages from the Couchbase connector will be tagged for later redaction by the Couchbase log redaction tool. NONE = no tagging; PARTIAL = user data is tagged; FULL = user, meta, and system data is tagged.
- 
Type: string 
- 
Default: NONE
- 
Valid Values: One of [NONE, PARTIAL, FULL] 
- 
Importance: medium 
couchbase.log.document.lifecycle
If true, document lifecycle milestones will be logged at INFO level instead of DEBUG. Enabling this feature lets you watch documents flow through the connector. Disabled by default because it generates many log messages.
- 
Type: boolean 
- 
Default: false
- 
Importance: medium 
couchbase.metrics.interval
The connector writes metrics to the log at this interval.
Disable metric logging by setting this to 0.
UNCOMMITTED; this feature may change in a patch release without notice.
- 
Since: 4.2.3 
- 
Type: string 
- 
Default: 10m
- 
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m 
- 
Importance: medium 
Sink Behavior
couchbase.default.collection
Qualified name (scope.collection or bucket.scope.collection) of the destination collection.
If the bucket component contains a dot, escape it by enclosing it in backticks.
If the bucket component is omitted, it defaults to the value of the couchbase.bucket property.
This property supports contextual overrides. The context is the name of the Kafka topic the record came from. Specify a context by enclosing it in square brackets and appending it to the property name. When there is no matching override, the value of the base property (without brackets) is used instead.
- 
Override example: couchbase.default.collection[widgets]=inventory.widgets
- 
Type: string 
- 
Default: _default._default
- 
Valid Values: A qualified collection name like 'scope.collection' or 'bucket.scope.collection'. If the bucket component contains a dot, escape it by enclosing it in backticks. 
- 
Importance: medium 
couchbase.topic.to.collection
A map from Kafka topic to Couchbase collection.
Topic and collection are joined by an equals sign. Map entries are delimited by commas.
A collection name is of the form bucket.scope.collection or scope.collection.
If the bucket component is omitted, it defaults to the value of the couchbase.bucket property.
If the bucket component contains a dot, escape it by enclosing it in backticks.
For example, if you want to write messages from topic "topic1" to collection "scope-a.invoices" in the default bucket, and messages from topic "topic2" to collection "scope-a.widgets" in bucket "other-bucket" you would write: "topic1=scope-a.invoices,topic2=other-bucket.scope-a.widgets".
Defaults to an empty map, with all documents going to the collection specified by couchbase.default.collection.
| DEPRECATED. Instead, please use couchbase.default.collectionwith contextual overrides. | 
- 
Type: list 
- 
Default: ""
- 
Valid Values: topic1=scope.collection,topic2=other-bucket.scope.collection,… If a bucket component contains a dot, escape it by enclosing it in backticks. 
- 
Importance: medium 
couchbase.topic.to.document.id
A map of per-topic overrides for the couchbase.document.id configuration property.
Topic and document ID format are joined by an equals sign. Map entries are delimited by commas.
For example, if documents from topic "topic1" should be assigned document IDs that match their "id" field, and documents from topic "topic2" should be assigned documents IDs that match their "identifier" field, you would write: "topic1=${/id},topic2=${/identifier}".
Defaults to an empty map, which means the value of the couchbase.document.id configuration property is applied to documents from all topics.
UNCOMMITTED; this feature may change in a patch release without notice.
| DEPRECATED. Instead, please use couchbase.document.idwith contextual overrides. | 
- 
Since: 4.2.4 
- 
Type: list 
- 
Default: ""
- 
Valid Values: topic1=${/id},topic2=${/some/other/path},… 
- 
Importance: medium 
couchbase.sink.handler
The fully-qualified class name of the sink handler to use. The sink handler determines how the Kafka record is translated into actions on Couchbase documents.
The built-in handlers are: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler, com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler, and com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler.
You can customize the sink connector’s behavior by implementing your own SinkHandler.
- 
Type: class 
- 
Default: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler
- 
Importance: medium 
couchbase.document.mode
Overrides the couchbase.sink.handler property.
A value of N1QL forces the handler to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler.
A value of SUBDOCUMENT forces the handler to com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler.
| DEPRECATED. Please set the couchbase.sink.handlerproperty instead. | 
- 
Type: string 
- 
Default: DOCUMENT
- 
Valid Values: One of [DOCUMENT, SUBDOCUMENT, N1QL] 
- 
Importance: medium 
couchbase.document.id
Format string to use for the Couchbase document ID. May refer to document fields via placeholders like ${/path/to/field}
If not specified, the Couchbase document ID is the Kafka record key.
This property supports contextual overrides. The context is the name of the Kafka topic the record came from. Specify a context by enclosing it in square brackets and appending it to the property name. When there is no matching override, the value of the base property (without brackets) is used instead.
- 
Override example: couchbase.document.id[widgets]=widget::${/widgetId}
- 
Type: string 
- 
Default: ""
- 
Importance: medium 
couchbase.remove.document.id
Whether to remove the ID identified by 'couchbase.documentId' from the document before storing in Couchbase.
This property supports contextual overrides. The context is the name of the Kafka topic the record came from. Specify a context by enclosing it in square brackets and appending it to the property name. When there is no matching override, the value of the base property (without brackets) is used instead.
- 
Override example: couchbase.remove.document.id[widgets]=true
- 
Type: boolean 
- 
Default: false
- 
Importance: medium 
couchbase.document.expiration
Document expiration time specified as an integer followed by a time unit (s = seconds, m = minutes, h = hours, d = days). For example, to have documents expire after 30 minutes, set this value to "30m".
A value of "0" (the default) means documents never expire.
This property supports contextual overrides. The context is the name of the Kafka topic the record came from. Specify a context by enclosing it in square brackets and appending it to the property name. When there is no matching override, the value of the base property (without brackets) is used instead.
- 
Override example: couchbase.document.expiration[alerts]=14d
- 
Type: string 
- 
Default: 0
- 
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m 
- 
Importance: medium 
couchbase.retry.timeout
Retry failed writes to Couchbase until this deadline is reached. If time runs out, the connector terminates.
A value of 0 (the default) means the connector will terminate immediately when a write fails.
| This retry timeout is distinct from the KV timeout (which you can set via couchbase.env.*).
The KV timeout affects an individual write attempt, while the retry timeout spans multiple attempts and makes the connector resilient to more kinds of transient failures. | 
| Try not to confuse this with the Kafka Connect framework’s built-in errors.retry.timeoutconfig property, which applies only to failures occurring before the framework delivers the record to the Couchbase connector. | 
- 
Since: 4.1.4 
- 
Type: string 
- 
Default: 0
- 
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m 
- 
Importance: medium 
Durability
couchbase.durability
The preferred way to specify an enhanced durability requirement when using Couchbase Server 6.5 or later.
The default value of NONE means a write is considered successful as soon as it reaches the memory of the active node.
| If you set this to anything other than NONE, then you must not setcouchbase.persist.toorcouchbase.replicate.to. | 
- 
Type: string 
- 
Default: NONE
- 
Valid Values: One of [NONE, MAJORITY, MAJORITY_AND_PERSIST_TO_ACTIVE, PERSIST_TO_MAJORITY] 
- 
Importance: medium 
couchbase.persist.to
For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write is persisted to disk on a certain number of replicas before considering the write successful.
If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability property instead.
- 
Type: string 
- 
Default: NONE
- 
Valid Values: One of [NONE, ACTIVE, ONE, TWO, THREE, FOUR] 
- 
Importance: medium 
couchbase.replicate.to
For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write has reached the memory of a certain number of replicas before considering the write successful.
If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability property instead.
- 
Type: string 
- 
Default: NONE
- 
Valid Values: One of [NONE, ONE, TWO, THREE] 
- 
Importance: medium 
N1ql Sink Handler
couchbase.n1ql.operation
The type of update to use when couchbase.sink.handler is set to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler.
This property is specific to N1qlSinkHandler.
- 
Type: string 
- 
Default: UPDATE
- 
Valid Values: One of [UPDATE, UPDATE_WHERE] 
- 
Importance: medium 
couchbase.n1ql.where.fields
When using the UPDATE_WHERE operation, this is the list of document fields that must match the Kafka message in order for the document to be updated with the remaining message fields. To match against a literal value instead of a message field, use a colon to delimit the document field name and the target value. For example, "type:widget,color" matches documents whose 'type' field is 'widget' and whose 'color' field matches the 'color' field of the Kafka message.
This property is specific to N1qlSinkHandler.
- 
Type: list 
- 
Default: ""
- 
Importance: medium 
Analytics Sink Handler
couchbase.analytics.max.records.in.batch
Every Batch consists of an UPSERT or a DELETE statement, based on mutations. This property determines the maximum number of records in the UPSERT or DELETE statement in the batch. Users can configure this parameter based on the capacity of their analytics cluster.
This property is specific to AnalyticsSinkHandler.
UNCOMMITTED; this feature may change in a patch release without notice.
- 
Since: 4.1.14 
- 
Type: int 
- 
Default: 100
- 
Importance: medium 
couchbase.analytics.max.size.in.batch
Every Batch consists of an UPSERT or a DELETE statement, based on mutations. This property defines the max size of all docs in bytes in an UPSERT statement in a batch. Users can configure this parameter based on the capacity of their analytics cluster.
This property is specific to AnalyticsSinkHandler.
UNCOMMITTED; this feature may change in a patch release without notice.
- 
Since: 4.2.0 
- 
Type: string 
- 
Default: 5m
- 
Valid Values: An integer followed by a size unit (b = bytes, k = kilobytes, m = megabytes, g = gigabytes). For example, to specify 64 megabytes: 64m 
- 
Importance: medium 
couchbase.analytics.query.timeout
This property determines the time period after which client cancels the Query request for Analytics.
This property is specific to AnalyticsSinkHandler.
UNCOMMITTED; this feature may change in a patch release without notice.
- 
Since: 4.2.0 
- 
Type: string 
- 
Default: 5m
- 
Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m 
- 
Importance: medium 
Sub Document Sink Handler
couchbase.subdocument.path
JSON Pointer to the property of the Kafka message whose value is the subdocument path to use when modifying the Couchbase document.
This property is specific to SubDocumentSinkHandler.
- 
Type: string 
- 
Default: ""
- 
Importance: medium 
couchbase.subdocument.operation
Setting to indicate the type of update to a sub-document.
This property is specific to SubDocumentSinkHandler.
- 
Type: string 
- 
Default: UPSERT
- 
Valid Values: One of [UPSERT, ARRAY_PREPEND, ARRAY_APPEND] 
- 
Importance: medium 
Couchbase Java SDK Settings
couchbase.env.*
Any system property recognized by the Couchbase Java SDK may be specified in the Sink connector config if you omit the com. prefix from the system property name.
For example, the Couchbase Java SDK recognizes the system property com.couchbase.env.timeout.kvTimeout.
To specify this setting in the connector config, use the property name couchbase.env.timeout.kvTimeout.
| If you’re thinking about increasing the KV timeout to handle transient error conditions, consider using the connector’s couchbase.retry.timeoutconfig property instead.
The retry timeout is a more robust way to handle all kinds of write failures. | 
For a list of recognized properties, see Java SDK Client Settings.
UNCOMMITTED; this feature may change in a patch release without notice.
- 
Since: 4.0.6 
Parent topic: Kafka Connector
Previous topic: Source Configuration Options
Next topic: Couchbase Sample with Kafka Streams