Release notes for the 2.0 version of the Kafka connector.
This is minor GA release. It contains several bug fixes:
Allow changing the default timeout for socket connection through environment.
DefaultCouchbaseKafkaEnvironment.Builder builder = (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment .builder() .kafkaTopic("default") .kafkaZookeeperAddress("kafka1.vagrant") .couchbaseNodes("couchbase1.vagrant") .couchbaseBucket("default") .connectTimeout(10) // sets timeout to 10 seconds .dcpEnabled(true); CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(builder.build());
Fix Java 7 compatibility.
Version 2.0.0 is the general availability (GA) release of the Kafka connector. It contains important stability fixes, and also new API.
New features and behavioral changes
This release contains the following features:
All these issues share a common root cause. If the Kafka consumer handles new events at a slower rate than the DCP producer emits them, then all additional messages are dropped silently. Eventually, because most of these messages carry native buffers that cannot be freed by GC, the connector eventually dies from out-of-memory exceptions.
Fixing these issues required a new implementation of the DCP handler inside the JVM Core. Some classes were removed (such as
BucketStreamAggregatorState) and new, cleaner alternatives were introduced (such as
StreamState). These new classes encapsulate only one point in history, in contrast to the old classes that carried a starting and an ending sequence number. These alternatives replace the unintuitive direction and run-mode API with a method that accepts two states plus some helper methods that make it easier to initialize
ConnectorState. For example, the following code:
BucketStreamAggregatorState state = connector.buildState(Direction.TO_CURRENT); connector.run(state, RunMode.RESUME);
can be simplified as
ConnectorState startState = connector.startState(); ConnectorState currentState = connector.currentState(); connector.run(startState, currentState);
and the snippet
BucketStreamAggregatorState state = connector.buildState(Direction.FROM_CURRENT); connector.run(state, RunMode.LOAD_AND_RESUME);
ConnectorState loadedState = connector.loadState(); ConnectorState endState = connector.endState(); connector.run(loadedState, endState);
Zookeeper state serializer now uses the forward slash
/as a path separator, which makes it platform independent. Before this change, a connector running on Windows used a backward slash and other platforms used a forward slash.
This release does not handle cluster topology changes. For example, it does not add streams for partitions that are migrated between nodes during a rebalance. The API for the necessary enhancements to handle cluster topology changes is still under discussion. Corresponding JIRA tickets are: JVMCBC-307 and KAFKAC-7.