Quickstart

    +

    This section shows how to add the connector to your application.

    Download

    The connector is distributed as a ZIP archive which you can download from the Release Notes page.

    The connector is also available on Confluent Hub.

    If you prefer to build the connector from source:

    1. Clone the GitHub repository

    2. Run mvn package to generate the connector archive

    3. Look for couchbase-kafka-connect-couchbase-<version>.zip under the target directory.

    Getting Started

    However you obtained the connector archive, go ahead and unzip it now. The result should be a directory called couchbase-kafka-connect-couchbase-<version>. The rest of this guide will refer to this directory as $KAFKA_CONNECT_COUCHBASE_HOME.

    This guide assumes you have already installed Couchbase Server locally and have loaded the sample bucket called travel-sample. (It’s fine if you want to use a different bucket; neither the connector nor this guide depends on the contents of the documents in the bucket.)

    You’ll also need a local installation of Apache Kafka or Confluent Platform Kafka.

    Set Up Kafka

    If you already have an installation of Kafka and know how to start the servers, feel free to skip this section.

    Still reading? Don’t worry, setting up a basic installation is pretty easy. Download either Apache Kafka or Confluent Platform Kafka. For simplicity, this guide assumes you’re installing from a ZIP or TAR archive, so steer clear of the deb/rpm packages for now.

    Decompress the Apache Kafka or Confluent Platform archive and move the resulting directory under ~/opt (or wherever you like to keep this kind of software). The rest of this guide refers to the root of the installation directory as $KAFKA_HOME or $CONFLUENT_HOME. Be aware some config files are in different relative locations depending on whether you’re using Apache Kafka or Confluent Platform Kafka.

    Make sure the Kafka command line tools are in your path:

    export PATH=<path-to-apache-kafka-or-confluent>/bin:$PATH

    Start the Kafka Servers

    If you’re using Confluent Platform Kafka, you can use the confluent command to start the servers in development mode:

    confluent local start

    If you’re not using Confluent, the commands are slightly different, but the idea is the same. Start the servers by running these commands, each in a separate terminal:

    zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
    kafka-server-start.sh $KAFKA_HOME/config/server.properties

    Couchbase Source Connector

    The source connector listens for changes to Couchbase documents and publishes them to a Kafka topic.

    If you’re more interested in saving Kafka messages to Couchbase, skip ahead to the Couchbase Sink Connector section.

    Configure the Source Connector

    The Couchbase connector distribution includes sample config files. Look inside $KAFKA_CONNECT_COUCHBASE_HOME/etc and edit the quickstart-couchbase-source.properties file.

    Take a moment to peruse the configuration options specified here. Some are standard options available to all connectors. The rest are specific to the Couchbase connector.

    For this exercise, change the value of couchbase.bucket to travel-sample (or whichever bucket you want to stream from). For couchbase.username and couchbase.password, supply the credentials of a Couchbase user who has the "Data DCP Reader" role for the bucket. If you have not yet created such a user, now is a good time to read about Creating and Managing Users with the UI.

    Run the Source Connector

    Kafka connectors can be run in standalone or distributed mode. For now let’s run the connector in standalone mode, using the CLASSPATH environment variable to include the Couchbase connector JAR in the class path.

    For Confluent Platform Kafka:

    cd $KAFKA_CONNECT_COUCHBASE_HOME
    env CLASSPATH=lib/* \
        connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
                           etc/quickstart-couchbase-source.properties

    Or for Apache Kafka:

    cd $KAFKA_CONNECT_COUCHBASE_HOME
    env CLASSPATH=lib/* \
        connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties \
                           etc/quickstart-couchbase-source.properties

    Alternatively, Run the Connector with Class Loader Isolation

    Class loader isolation prevents plugins that use different versions of the same Java library from interfering with one another. Instead of adding the plugin JARs to the class path when running the connect worker, you can install them in a location called the "plugin path."

    There are multiple ways to do this. You can choose whichever method is most convenient. All of these methods involve the plugin.path worker config property which is defined in the connect-*.properties file referenced by the above commands.

    Once the connector is in the plugin path (using any of the methods described below), it is no longer necessary to set the CLASSPATH environment variable when running the connect worker.

    Option 1: Use Confluent Platform tools

    Confluent Platform users can install the connector by running the confluent-hub install command:

    To install from a ZIP archive on the local filesystem:

    confluent-hub install couchbase-kafka-connect-couchbase-<version>.zip

    Alternatively, you can download and install the connector directly from the Confluent Hub component repository:

    confluent-hub install couchbase/kafka-connect-couchbase:<version>

    Option 2: Modify the plugin.path config property

    Alternatively, Edit the connect worker config file (connect-*.properties) and search for the plugin.path property. Change the value of this property to include the full path to the couchbase-kafka-connect-couchbase-<version> directory (which we have been calling $KAFKA_CONNECT_COUCHBASE_HOME).

    Option 3: Copy the connector to an existing plugin.path location

    Alternatively, copy the $KAFKA_CONNECT_COUCHBASE_HOME/lib directory to one of the directories already listed in the plugin path. You may also wish to rename the directory from lib to kafka-connect-couchbase.

    For example, if the plugin.path property is /opt/connectors, you want to end up with a directory structure like:

    opt
    `-- connectors
        `-- kafka-connect-couchbase
            |-- kafka-connect-couchbase-<version>.jar
            |-- java-client-<version>.jar
            |-- core-io-<version>.jar
            `-- (and the other JARs too)

    Observe Messages Published by Couchbase Source Connector

    The sample config file tells the source connector to publish to a topic called test-default. Let’s use the Kafka command-line tools to spy on the contents of the topic.

    For Confluent Platform Kafka:

    kafka-console-consumer --bootstrap-server localhost:9092 \
                           --property print.key=true \
                           --topic test-default --from-beginning
    When a topic contains messages in Avro format, Confluent users should view the messages by running kafka-avro-console-consumer instead of kafka-console-consumer.

    Or for Apache Kafka:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 \
                              --property print.key=true \
                              --topic test-default --from-beginning

    Each line of the output represents a document in Couchbase. Every time a Couchbase document is created, modified, or deleted, the console consumer prints another line containing the updated version of the document.

    Once the consumer catches up to the current state of the bucket, try creating, updating, or deleting a document via the Couchbase Web Console and observe how the change is propagated to the Kafka topic.

    Changing the format of published messages

    A component called a "source handler" determines the content of the published messages.

    The sample config uses RawJsonSourceHandler, which publishes JSON messages identical to the Couchbase documents. This section describes how to use this source handler, and then discusses alternate source handlers.

    RawJsonSourceHandler

    This handler always publishes records in JSON format, and requires the value converter be set to ByteArrayConverter which acts as a pass-through for the output byte array containing the JSON.

    key.converter=org.apache.kafka.connect.storage.StringConverter
    couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

    When a Couchbase document is deleted, RawJsonSourceHandler sends a Kafka message with a null value. If instead you wish to ignore deletion events, filter them out with the DropIfNullValue transform:

    transforms=ignoreDeletes
    transforms.ignoreDeletes.type=com.couchbase.connect.kafka.transform.DropIfNullValue

    As a performance optimization, RawJsonSourceHandler and its cousin RawJsonWithMetadataSourceHandler create Kafka Connect records whose values are byte arrays. If you wish to use these handlers together with transforms that modify document content, the record value must be converted from a byte array to a compatible format. To do this, include the DeserializeJson transform as the first in the chain and set value.converter to JsonConverter instead of ByteArrayConverter like so:

    couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    
    transforms=deserializeJson,someOtherTransform
    transforms.deserializeJson.type=com.couchbase.connect.kafka.transform.DeserializeJson
    transforms.someOtherTransform.type=...

    RawJsonSourceHandlerWithMetadata

    This source handler is similar to RawJsonSourceHandler, but it wraps the Couchbase document content in an envelope that includes document metadata. Like RawJsonSourceHandler, it requires the ByteArrayConverter value converter (unless you are using Single Message Transforms, in which case you should use JsonConverter and execute DeserializeJson as the first transform).

    key.converter=org.apache.kafka.connect.storage.StringConverter
    couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandlerWithMetadata
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

    When you use this source handler, each record has an event field whose value indicates the type of change represented by the message. The possible values are:

    • mutation: A change to document content, including creation and changes made via subdocument commands.

    • deletion: Removal or expiration of the document.

    • expiration: Reserved for document expiration (Couchbase Server does not currently send this event type, but may in future versions).

    For mutation messages, the entire content of the Couchbase document is present as the value of the content field.

    DefaultSchemaSourceHandler

    This source handler generates records whose values contain the same kind of metadata envelope as RawJsonSourceHandlerWithMetadata. It differs in that it defines a schema for the envelope. You can use this source handler with any value converter; specify whichever converter matches your desired publication format.

    couchbase.source.handler=com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler
    The schema used by this source handler defines the Couchbase document content to be a byte array. If you use JsonConverter, this byte array will be serialized as a single Base64-encoded string. If this is not the behavior you want, consider using one of the raw JSON source handlers instead.

    For reference, the Avro schema for this payload format is shown below.

    {
      "type": "record",
      "name": "DcpMessage",
      "namespace": "com.couchbase",
      "fields": [
        {
          "name": "event",
          "type": "string"
        },
        {
          "name": "partition",
          "type": {
            "type": "int",
            "connect.type": "int16"
          }
        },
        {
          "name": "key",
          "type": "string"
        },
        {
          "name": "cas",
          "type": "long"
        },
        {
          "name": "bySeqno",
          "type": "long"
        },
        {
          "name": "revSeqno",
          "type": "long"
        },
        {
          "name": "expiration",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "flags",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "lockTime",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "content",
          "type": [
            "null",
            "bytes"
          ]
        }
      ],
      "connect.name": "com.couchbase.DcpMessage"
    }

    Writing a custom SourceHandler

    If none of the existing source handlers meet your requirements, you can write your own. The connector’s GitHub repository includes an example project you can use as a template for creating your own source handlers and Single Message Transforms.

    Couchbase Sink Connector

    Now let’s talk about the sink connector, which reads messages from one or more Kafka topics and writes them to Couchbase Server.

    The sink connector will attempt to convert message values to JSON. If the conversion fails, the connector will fall back to treating the value as a String BLOB.

    If the Kafka key is a primitive type, the connector will use it as the document ID. If the Kafka key is absent or of complex type (array or struct), the document ID will be generated as topic/partition/offset.

    Alternatively, the document ID can come from the body of the Kafka message. Provide a couchbase.document.id property whose value is a JSON Pointer identifying the document ID node. If you want the connector to remove this node before persisting the document to Couchbase, provide a couchbase.remove.document.id property with value true. If the connector fails to locate the document ID node, it will fall back to using the Kafka key or topic/partition/offset as described above.

    As of version 3.2.2, if the Kafka message body is null, the sink connector will delete the Couchbase document whose ID matches the Kafka message key.

    Configure and Run the Sink Connector

    In the $KAFKA_CONNECT_COUCHBASE_HOME/etc directory there is a file called quickstart-couchbase-sink.properties. Customize this file as described in Configure the Source Connector, only now the bucket will receive messages, and the user must have write access to the bucket.

    Note: Make sure to specify an existing bucket, otherwise the sink connector will fail. You may wish to create a new bucket to receive the messages.

    To run the sink connector, use the same command as described in Run the Source Connector, but pass quickstart-couchbase-sink.properties as the second argument to connect-standalone instead of quickstart-couchbase-source.properties.

    Send Test Messages

    Now that the Couchbase Sink Connector is running, let’s give it some messages to import:

    git clone https://github.com/couchbase/kafka-connect-couchbase.git
    cd kafka-connect-couchbase/examples/json-producer
    mvn compile exec:java

    The producer will send some messages and then terminate. If all goes well, the messages will appear in the Couchbase bucket you specified in the sink connector config.

    If you wish to see how the Couchbase Sink Connector behaves in the absence of message keys, modify the publishMessage method in the example source code to set the message keys to null, then rerun the producer.

    Alternatively, if you want the Couchbase document ID to be the airport code, edit quickstart-couchbase-sink.properties and set couchbase.document.id=/airport, restart the sink connector, and run the producer again.

    Modify Documents Before Writing to Couchbase

    Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To experiment with this feature, try adding these lines to your sink connector configuration:

    transforms=addMagicWord
    transforms.addMagicWord.type=org.apache.kafka.connect.transforms.InsertField$Value
    transforms.addMagicWord.static.field=magicWord
    transforms.addMagicWord.static.value=xyzzy

    Now if you restart the sink connector and send some more test messages, each new Couchbase document should have a "magicWord" field with value "xyzzy".

    If the built-in transforms are not sufficient, you can write Java code to implement more complex logic. The custom-extensions project on GitHub includes a sample CustomTransform which you can use as a starting point for creating your own transforms.

    Parent topic: Kafka Connector