public class CouchbaseKafkaConnector extends Object implements Runnable
CouchbaseKafkaConnector
is an entry point of the library. It sets up connections with both Couchbase and
Kafka clusters. And carries all events from Couchbase to Kafka.
The example below will transfer all mutations from Couchbase bucket "my-bucket" as JSON to Kafka topic "my-topic".
DefaultCouchbaseKafkaEnvironment.Builder builder =
(DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment.builder()
.kafkaFilterClass("kafka.serializer.StringEncoder")
.kafkaValueSerializerClass("com.couchbase.kafka.coder.JsonEncoder")
.dcpEnabled(true);
CouchbaseKafkaEnvironment env = builder.build();
CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(env,
"couchbase.example.com", "my-bucket", "pass",
"kafka.example.com", "my-topic");
connector.run();
Modifier and Type | Method and Description |
---|---|
static CouchbaseKafkaConnector |
create()
Creates
CouchbaseKafkaConnector with default settings. |
static CouchbaseKafkaConnector |
create(CouchbaseKafkaEnvironment environment)
Create
CouchbaseKafkaConnector with specified settings. |
static CouchbaseKafkaConnector |
create(List<String> couchbaseNodes,
String couchbaseBucket,
String couchbasePassword,
String kafkaZookeeper,
String kafkaTopic,
CouchbaseKafkaEnvironment environment)
Create
CouchbaseKafkaConnector with specified settings. |
static CouchbaseKafkaConnector |
create(String couchbaseNode,
String couchbaseBucket,
String couchbasePassword,
String kafkaZookeeper,
String kafkaTopic)
Deprecated.
Use
CouchbaseKafkaEnvironment to initialize connector settings. |
ConnectorState |
currentState(int... partitions)
Initialize connector state for given partitions with current vbucketUUID and sequence number.
|
ConnectorState |
endState(int... partitions)
Initialize connector state for given partitions with current vbucketUUID and maximum sequence number.
|
ConnectorState |
loadState(int... partitions)
Initialize connector state for given partitions using configured serializer.
|
void |
run()
Executes worker reading loop, which relays all events from Couchbase to Kafka.
|
void |
run(ConnectorState fromState,
ConnectorState toState)
Executes worker reading loop, which relays events from Couchbase to Kafka in specified range.
|
ConnectorState |
startState(int... partitions)
Initialize connector state for given partitions with current vbucketUUID and zero sequence number.
|
public static CouchbaseKafkaConnector create()
CouchbaseKafkaConnector
with default settings. Like using "localhost" as endpoints,
"default" Couchbase bucket and Kafka topic.CouchbaseKafkaConnector
with default settingspublic static CouchbaseKafkaConnector create(CouchbaseKafkaEnvironment environment)
CouchbaseKafkaConnector
with specified settings.environment
- custom environment objectCouchbaseKafkaConnector
public static CouchbaseKafkaConnector create(List<String> couchbaseNodes, String couchbaseBucket, String couchbasePassword, String kafkaZookeeper, String kafkaTopic, CouchbaseKafkaEnvironment environment)
CouchbaseKafkaConnector
with specified settings.couchbaseNodes
- address of Couchbase node to override CouchbaseKafkaEnvironment.couchbaseNodes()
.couchbaseBucket
- name of Couchbase bucket to override CouchbaseKafkaEnvironment.couchbaseBucket()
.couchbasePassword
- password for Couchbase bucket to override CouchbaseKafkaEnvironment.couchbasePassword()
.kafkaZookeeper
- address of Zookeeper to override CouchbaseKafkaEnvironment.kafkaZookeeperAddress()
.kafkaTopic
- name of Kafka topic to override CouchbaseKafkaEnvironment.kafkaTopic()
.environment
- environment objectCouchbaseKafkaConnector
public static CouchbaseKafkaConnector create(String couchbaseNode, String couchbaseBucket, String couchbasePassword, String kafkaZookeeper, String kafkaTopic)
CouchbaseKafkaEnvironment
to initialize connector settings.CouchbaseKafkaConnector
with specified settings.couchbaseNode
- address of Couchbase node.couchbaseBucket
- name of Couchbase bucket.couchbasePassword
- password for Couchbase bucket.kafkaZookeeper
- address of Zookeeper.kafkaTopic
- name of Kafka topic.CouchbaseKafkaConnector
public ConnectorState currentState(int... partitions)
partitions
- list of partitions related to the state.public ConnectorState startState(int... partitions)
partitions
- list of partitions related to the state.public ConnectorState endState(int... partitions)
partitions
- list of partitions related to the state.public ConnectorState loadState(int... partitions)
partitions
- list of partitions related to the state.public void run()
public void run(ConnectorState fromState, ConnectorState toState)
fromState
- starting statetoState
- finishing stateCopyright © 2015 Couchbase, Inc.