Sink Configuration Options

      +

      Reference of the sink connector options.

      Connection

      couchbase.seed.nodes

      Addresses of Couchbase Server nodes, delimited by commas.

      If a custom port is specified, it must be the KV port (which is normally 11210 for insecure connections, or 11207 for secure connections).

      • Type: list

      • Importance: high

      couchbase.username

      Name of the Couchbase user to authenticate as.

      • Type: string

      • Importance: high

      couchbase.password

      Password of the Couchbase user.

      May be overridden with the KAFKA_COUCHBASE_PASSWORD environment variable.

      • Type: password

      • Importance: high

      couchbase.bucket

      Name of the Couchbase bucket to use.

      This property is required unless using the experimental AnalyticsSinkHandler.

      • Type: string

      • Default: ""

      • Importance: high

      couchbase.network

      The network selection strategy for connecting to a Couchbase Server cluster that advertises alternate addresses.

      A Couchbase node running inside a container environment (like Docker or Kubernetes) might be configured to advertise both its address within the container environment (known as its "default" address) as well as an "external" address for use by clients connecting from outside the environment.

      Setting the 'couchbase.network' config property to 'default' or 'external' forces the selection of the respective addresses. Setting the value to 'auto' tells the connector to select whichever network contains the addresses specified in the 'couchbase.seed.nodes' config property.

      • Type: string

      • Default: auto

      • Importance: medium

      couchbase.bootstrap.timeout

      On startup, the connector will wait this long for a Couchbase connection to be established. If a connection is not established before the timeout expires, the connector will terminate.

      • Type: string

      • Default: 30s

      • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

      • Importance: medium

      Security

      couchbase.enable.tls

      Use secure connection to Couchbase Server.

      If true, you must also tell the connector which certificate to trust. Specify a certificate file with 'couchbase.trust.certificate.path', or a Java keystore file with 'couchbase.trust.store.path' and 'couchbase.trust.store.password'.

      couchbase.enable.hostname.verification

      Set this to false to disable TLS hostname verification for Couchbase connections. Less secure, but might be required if for some reason you can’t issue a certificate whose Subject Alternative Names match the hostname used to connect to the server. Only disable if you understand the impact and can accept the risks.

      • Type: boolean

      • Default: true

      • Importance: medium

      couchbase.trust.store.path

      Absolute filesystem path to the Java keystore holding the CA certificate used by Couchbase Server.

      If you want to use a PEM file instead of a Java keystore, specify couchbase.trust.certificate.path instead.

      • Type: string

      • Default: ""

      • Importance: medium

      couchbase.trust.store.password

      Password for accessing the trust store.

      May be overridden with the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable.

      • Type: password

      • Default: [hidden]

      • Importance: medium

      couchbase.trust.certificate.path

      Absolute filesystem path to the PEM file containing the CA certificate used by Couchbase Server.

      If you want to use a Java keystore instead of a PEM file, specify couchbase.trust.store.path instead.

      • Type: string

      • Default: ""

      • Importance: medium

      couchbase.client.certificate.path

      Absolute filesystem path to a Java keystore or PKCS12 bundle holding the private key and certificate chain to use for client certificate authentication (mutual TLS).

      If you supply a value for this config property, the couchbase.username and couchbase.password properties will be ignored.

      • Type: string

      • Default: ""

      • Importance: medium

      couchbase.client.certificate.password

      Password for accessing the client certificate.

      May be overridden with the KAFKA_COUCHBASE_CLIENT_CERTIFICATE_PASSWORD environment variable.

      • Type: password

      • Default: [hidden]

      • Importance: medium

      Logging

      couchbase.log.redaction

      Determines which kinds of sensitive log messages from the Couchbase connector will be tagged for later redaction by the Couchbase log redaction tool. NONE = no tagging; PARTIAL = user data is tagged; FULL = user, meta, and system data is tagged.

      • Type: string

      • Default: NONE

      • Valid Values: One of [NONE, PARTIAL, FULL]

      • Importance: medium

      couchbase.log.document.lifecycle

      If true, document lifecycle milestones will be logged at INFO level instead of DEBUG. Enabling this feature lets you watch documents flow through the connector. Disabled by default because it generates many log messages.

      • Type: boolean

      • Default: false

      • Importance: medium

      Sink Behavior

      couchbase.default.collection

      Qualified name (scope.collection or bucket.scope.collection) of the destination collection for messages from topics that don’t have an entry in the couchbase.topic.to.collection map.

      If the bucket component contains a dot, escape it by enclosing it in backticks.

      If the bucket component is omitted, it defaults to the value of the couchbase.bucket property.

      • Type: string

      • Default: _default._default

      • Valid Values: A qualified collection name like 'scope.collection' or 'bucket.scope.collection'. If the bucket component contains a dot, escape it by enclosing it in backticks.

      • Importance: medium

      couchbase.topic.to.collection

      A map from Kafka topic to Couchbase collection.

      Topic and collection are joined by an equals sign. Map entries are delimited by commas.

      A collection name is of the form bucket.scope.collection or scope.collection. If the bucket component is omitted, it defaults to the value of the couchbase.bucket property. If the bucket component contains a dot, escape it by enclosing it in backticks.

      For example, if you want to write messages from topic "topic1" to collection "scope-a.invoices" in the default bucket, and messages from topic "topic2" to collection "scope-a.widgets" in bucket "other-bucket" you would write: "topic1=scope-a.invoices,topic2=other-bucket.scope-a.widgets".

      Defaults to an empty map, with all documents going to the collection specified by couchbase.default.collection.

      • Type: list

      • Default: ""

      • Valid Values: topic1=scope.collection,topic2=other-bucket.scope.collection,…​ If a bucket component contains a dot, escape it by enclosing it in backticks.

      • Importance: medium

      couchbase.sink.handler

      The fully-qualified class name of the sink handler to use. The sink handler determines how the Kafka record is translated into actions on Couchbase documents.

      The built-in handlers are: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler, com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler, and com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler.

      You can customize the sink connector’s behavior by implementing your own SinkHandler.

      • Type: class

      • Default: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler

      • Importance: medium

      couchbase.document.mode

      Overrides the couchbase.sink.handler property.

      A value of N1QL forces the handler to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler. A value of SUBDOCUMENT forces the handler to com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler.

      DEPRECATED. Please set the couchbase.sink.handler property instead.
      • Type: string

      • Default: DOCUMENT

      • Valid Values: One of [DOCUMENT, SUBDOCUMENT, N1QL]

      • Importance: medium

      couchbase.document.id

      Format string to use for the Couchbase document ID (overriding the message key). May refer to document fields via placeholders like ${/path/to/field}

      • Type: string

      • Default: ""

      • Importance: medium

      couchbase.remove.document.id

      Whether to remove the ID identified by 'couchbase.documentId' from the document before storing in Couchbase.

      • Type: boolean

      • Default: false

      • Importance: medium

      couchbase.document.expiration

      Document expiration time specified as an integer followed by a time unit (s = seconds, m = minutes, h = hours, d = days). For example, to have documents expire after 30 minutes, set this value to "30m".

      A value of "0" (the default) means documents never expire.

      • Type: string

      • Default: 0

      • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

      • Importance: medium

      couchbase.retry.timeout

      Retry failed writes to Couchbase until this deadline is reached. If time runs out, the connector terminates.

      A value of 0 (the default) means the connector will terminate immediately when a write fails.

      This retry timeout is distinct from the KV timeout (which you can set via couchbase.env.*). The KV timeout affects an individual write attempt, while the retry timeout spans multiple attempts and makes the connector resilient to more kinds of transient failures.
      Try not to confuse this with the Kafka Connect framework’s built-in errors.retry.timeout config property, which applies only to failures occurring before the framework delivers the record to the Couchbase connector.
      • Since: 4.1.4

      • Type: string

      • Default: 0

      • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

      • Importance: medium

      Durability

      couchbase.durability

      The preferred way to specify an enhanced durability requirement when using Couchbase Server 6.5 or later.

      The default value of NONE means a write is considered successful as soon as it reaches the memory of the active node.

      If you set this to anything other than NONE, then you must not set couchbase.persist.to or couchbase.replicate.to.
      • Type: string

      • Default: NONE

      • Valid Values: One of [NONE, MAJORITY, MAJORITY_AND_PERSIST_TO_ACTIVE, PERSIST_TO_MAJORITY]

      • Importance: medium

      couchbase.persist.to

      For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write is persisted to disk on a certain number of replicas before considering the write successful.

      If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability property instead.

      • Type: string

      • Default: NONE

      • Valid Values: One of [NONE, ACTIVE, ONE, TWO, THREE, FOUR]

      • Importance: medium

      couchbase.replicate.to

      For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write has reached the memory of a certain number of replicas before considering the write successful.

      If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability property instead.

      • Type: string

      • Default: NONE

      • Valid Values: One of [NONE, ONE, TWO, THREE]

      • Importance: medium

      N1ql Sink Handler

      couchbase.n1ql.operation

      The type of update to use when couchbase.sink.handler is set to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler.

      This property is specific to N1qlSinkHandler.

      • Type: string

      • Default: UPDATE

      • Valid Values: One of [UPDATE, UPDATE_WHERE]

      • Importance: medium

      couchbase.n1ql.where.fields

      When using the UPDATE_WHERE operation, this is the list of document fields that must match the Kafka message in order for the document to be updated with the remaining message fields. To match against a literal value instead of a message field, use a colon to delimit the document field name and the target value. For example, "type:widget,color" matches documents whose 'type' field is 'widget' and whose 'color' field matches the 'color' field of the Kafka message.

      This property is specific to N1qlSinkHandler.

      • Type: list

      • Default: ""

      • Importance: medium

      couchbase.n1ql.create.document

      Controls whether to create the document if it does not exist.

      This property is specific to N1qlSinkHandler.

      • Type: boolean

      • Default: true

      • Importance: medium

      Analytics Sink Handler

      couchbase.analytics.max.records.in.batch

      Every Batch consists of an UPSERT or a DELETE statement, based on mutations. This property determines the maximum number of records in the UPSERT or DELETE statement in the batch. Users can configure this parameter based on the capacity of their analytics cluster.

      This property is specific to AnalyticsSinkHandler.

      UNCOMMITTED; this feature may change in a patch release without notice.

      • Since: 4.1.14

      • Type: int

      • Default: 100

      • Importance: medium

      couchbase.analytics.max.size.in.batch

      Every Batch consists of an UPSERT or a DELETE statement, based on mutations. This property defines the max size of all docs in bytes in an UPSERT statement in a batch. Users can configure this parameter based on the capacity of their analytics cluster.

      This property is specific to AnalyticsSinkHandler.

      UNCOMMITTED; this feature may change in a patch release without notice.

      • Since: 4.2.0

      • Type: string

      • Default: 5m

      • Valid Values: An integer followed by a size unit (b = bytes, k = kilobytes, m = megabytes, g = gigabytes). For example, to specify 64 megabytes: 64m

      • Importance: medium

      couchbase.analytics.query.timeout

      This property determines the time period after which client cancels the Query request for Analytics.

      This property is specific to AnalyticsSinkHandler.

      UNCOMMITTED; this feature may change in a patch release without notice.

      • Since: 4.2.0

      • Type: string

      • Default: 5m

      • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

      • Importance: medium

      Sub Document Sink Handler

      couchbase.subdocument.path

      JSON Pointer to the property of the Kafka message whose value is the subdocument path to use when modifying the Couchbase document.

      This property is specific to SubDocumentSinkHandler.

      • Type: string

      • Default: ""

      • Importance: medium

      couchbase.subdocument.operation

      Setting to indicate the type of update to a sub-document.

      This property is specific to SubDocumentSinkHandler.

      • Type: string

      • Default: UPSERT

      • Valid Values: One of [UPSERT, ARRAY_PREPEND, ARRAY_APPEND]

      • Importance: medium

      couchbase.subdocument.create.path

      Whether to add the parent paths if they are missing in the document.

      This property is specific to SubDocumentSinkHandler.

      • Type: boolean

      • Default: true

      • Importance: medium

      couchbase.subdocument.create.document

      This property controls whether to create the document if it does not exist.

      This property is specific to SubDocumentSinkHandler.

      • Type: boolean

      • Default: true

      • Importance: medium

      Couchbase Java SDK Settings

      couchbase.env.*

      Any system property recognized by the Couchbase Java SDK may be specified in the Sink connector config if you omit the com. prefix from the system property name.

      For example, the Couchbase Java SDK recognizes the system property com.couchbase.env.timeout.kvTimeout. To specify this setting in the connector config, use the property name couchbase.env.timeout.kvTimeout.

      If you’re thinking about increasing the KV timeout to handle transient error conditions, consider using the connector’s couchbase.retry.timeout config property instead. The retry timeout is a more robust way to handle all kinds of write failures.

      For a list of recognized properties, see Java SDK Client Settings.

      UNCOMMITTED; this feature may change in a patch release without notice.

      • Since: 4.0.6

      Parent topic: Kafka Connector