Analytics using the SDK

Parallel data management for complex queries over many records, using a familiar N1QL-like syntax.

For complex and long-running queries, involving large ad hoc join, set, aggregation, and grouping operations, the Couchbase Data Platform offers the Couchbase Analytics Service (CBAS). This is the analytic counterpart to our operational data focussed Query Service. The analytics service is available in Couchbase Data Platform 6.0 and later.

Getting Started

After familiarizing yourself with our introductory primer, in particular creating a dataset and linking it to a bucket, try Couchbase Analytics using the Java SDK. Intentionally, the API for analytics is nearly identical to that of the query service.

Before starting, here’s all imports used in the following examples:

import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.analytics.AnalyticsResult;
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
import com.couchbase.client.java.analytics.ReactiveAnalyticsResult;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static com.couchbase.client.java.analytics.AnalyticsOptions.analyticsOptions;

Here’s a complete example of doing an analytics query and handling the results:

try {
  final AnalyticsResult result = cluster
    .analyticsQuery("select \"hello\" as greeting");

  for (JsonObject row : result.rowsAsObject()) {
    System.out.println("Found row: " + row);
  }

  System.out.println("Reported execution time: "
    + result.metaData().metrics().executionTime());
} catch (CouchbaseException ex) {
  ex.printStackTrace();
}

Let’s break it down. An analytics query is always performed at the Cluster level, using the analyticsQuery method. It takes the statement as a required argument and then allows to provide additional options if needed (in the example above, no options are specified).

Once a result returns you can iterate the returned rows and/or accessing the AnalyticsMetaData associated with the query. If something goes wrong during the execution of the query, a derivate of the CouchbaseException will be thrown that also provides additional context on the operation:

Exception in thread "main" com.couchbase.client.core.error.ParsingFailureException: Parsing of the input failed {"completed":true,"coreId":1,"errors":[{"code":24000,"message":"Syntax error: In line 1 >>select 1=;<< Encountered \"=\" at column 9. "}], ... }
Open Buckets and Cluster-Level Queries

If you are using a cluster older than Couchbase Server 6.5, it is required that there is at least one bucket open before performing a cluster-level query. If you fail to do so, the SDK will return a FeatureNotAvailableException with a descriptive error message asking you to open one.

Paramerterized Queries

Supplying parameters as individual arguments to the query allows the analytics engine to optimize the parsing and planning of the query. You can either supply these parameters by name or by position.

The first example shows how to provide them by name:

AnalyticsResult result = cluster.analyticsQuery(
  "select count(*) from airports where country = $country",
  analyticsOptions().parameters(JsonObject.create().put("country", "France"))
);

The second example by position:

AnalyticsResult result = cluster.analyticsQuery(
  "select count(*) from airports where country = ?",
  analyticsOptions().parameters(JsonArray.from("France"))
);

What style you choose is up to you, for readability in more complex queries we generally recommend using the named parameters.

Note that you cannot use parameters in all positions. If you put it in an unsupported place the server will respond with a ParsingFailureException.

The Analytics Result

When performing an analytics query, the response you receive is an AnalyticsResult. If no exception gets raised the request succeeded and provides access to both the rows returned and also associated AnalyticsMetaData.

Rows can be consumed either through a JsonObject directly, turned into a java collection instance (like a Map) or into your POJO of choice mapping directly to your domain logic.

AnalyticsResult result = cluster.analyticsQuery(
  "select * from `travel-sample` limit 10"
);
for (JsonObject row : result.rowsAsObject()) {
  System.out.println("Found row: " + row);
}

The AnalyticsMetaData provides insight into some basic profiling/timing information as well as information like the clientContextId.

Table 1. AnalyticsMetaData
Name Description

String requestId()

Returns the request identifer of this request.

String clientContextId()

Returns the context ID either generated by the SDK or supplied by the user.

AnalyticsStatus status()

An enum simply representing the state of the result.

AnalyticsMetrics metrics()

Returns metrics provided by analytics for the request.

Optional<JsonObject> signature()

If a signature is present, it will be available to consume in a generic fashion.

List<AnalyticsWarning> warnings()

Non-fatal errors are available to consume as warnings on this method.

For example, here is how you can print the executionTime of a query:

AnalyticsResult result = cluster.analyticsQuery("select 1=1");
System.err.println(
  "Execution time: " + result.metaData().metrics().executionTime()
);

Analytics Options

The analytics service provides an array of options to customize your query. The following table lists them all:

Table 2. Available Analytics Options
Name Description

clientContextId(String)

Sets a context ID returned by the service for debugging purposes.

parameters(JsonArray)

Allows to set positional arguments for a parameterized query.

parameters(JsonObject)

Allows to set named arguments for a parameterized query.

priority(boolean)

Assigns a different server-side priority to the query.

raw(String, Object)

Escape hatch to add arguments that are not covered by these options.

readonly(boolean)

Tells the client and server that this query is readonly.

scanConsistency(AnalyticsScanConsistency)

Sets a different scan consistency for this query.

serializer(JsonSerializer)

Allows to use a different serializer for the decoding of the rows.

Scan Consistency

By default, the analytics engine will return whatever is currently in the index at the time of query (this mode is also called AnalyticsScanConsistency.NOT_BOUNDED). If you need to include everything that has just been written, a different scan consistency must be chosen. If AnalyticsScanConsistency.REQUEST_PLUS is chosen, it will likely take a bit longer to return the results but the analytics engine will make sure that it is as up-to-date as possible.

AnalyticsResult result = cluster.analyticsQuery(
  "select ...",
  analyticsOptions().scanConsistency(AnalyticsScanConsistency.REQUEST_PLUS)
);

Client Context Id

The SDK will always send a client context ID with each query, even if none is provided by the user. By default a UUID will be generated that is mirrored back from the analytics engine and can be used for debugging purposes. A custom string can always be provided if you want to introduce application-specific semantics into it (so that for example in a network dump it shows up with a certain identifier). Whatever is chosen, we recommend making sure it is unique so different queries can be distinguished during debugging or monitoring.

AnalyticsResult result = cluster.analyticsQuery(
  "select ...",
  analyticsOptions().clientContextId("user-44" + UUID.randomUUID().toString())
);

Priority

By default, every analytics query has the same priority on the server. By setting this boolean flag to true, you are indicating that you need expedited dispatch in the analytice engine for this request.

AnalyticsResult result = cluster.analyticsQuery(
  "select ...",
  analyticsOptions().priority(true)
);

Readonly

If the query is marked as readonly, both the server and the SDK can improve processing of the operation. On the client side, the SDK can be more liberal with retries because it can be sure that there are no state-mutating side-effects happening. The query engine will ensure that actually no data is mutated when parsing and planning the query.

AnalyticsResult result = cluster.analyticsQuery(
  "select ...",
  analyticsOptions().readonly(true)
);

Custom JSON Serializer

Like with all JSON apis, it is possible to customize the JSON serializer. It allows to plug in your own library (like GSON) or custom configured mappings on your own Jackson serializer. This in turn makes it possible to serialize rows into POJOs or other structures that your application defines and the SDK has no idea about.

Please see the documentation transcoding and serialization for more information.

Reactive And Async APIs

In addition to the blocking API on Cluster, the SDK provides reactive and async APIs on ReactiveCluster or AsyncCluster respectively. If you are in doubt of which API to use, we recommend looking at the reactive first. It builds on top of reactor, a powerful library that allows you to compose reactive computations and deal with error handling and other related concerns (like retry) in an elegant manner. The async API on the other hand exposes a CompletableFuture and is more meant for lower level integration into other libraries or if you need the last drop of performance.

Also, there is another reason you want to use the reactive API: streaming large results with backpressure from the application side. Both the blocking and async APIs have no means of signalling backpressure in a good way, so if you need it the reactive API is your best option.

Advanced Reactive Concepts Ahead

Please see the guides on reactive programming for more information on the basics, this guide is diving straight into their impact on querying analytics.

A simple reactive query is similar to the blocking one:

Mono<ReactiveAnalyticsResult> result = cluster
  .reactive()
  .analyticsQuery("select 1=1");

result
  .flatMapMany(ReactiveAnalyticsResult::rowsAsObject)
  .subscribe(row -> System.out.println("Found row: " + row));

This query will stream all rows as they become available form the server. If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit request() calls.

Mono<ReactiveAnalyticsResult> result = cluster
  .reactive()
  .analyticsQuery("select * from hugeDataset");

result
  .flatMapMany(ReactiveAnalyticsResult::rowsAsObject)
  .subscribe(new BaseSubscriber<JsonObject>() {
    // Number of outstanding requests
    final AtomicInteger oustanding = new AtomicInteger(0);

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
      request(10); // initially request to rows
      oustanding.set(10);
    }

    @Override
    protected void hookOnNext(JsonObject value) {
      process(value);
      if (oustanding.decrementAndGet() == 0) {
        request(10);
        oustanding.set(10);
      }
    }
});

In this example we initially request a batch size of 10 rows (so streaming can begin). Then as each row gets streamed it is written to a process() method which does whatever it needs to do to process. Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded. Please note that with reactive code, if your process() method equivalent is blocking, you must move it onto another scheduler so that the I/O threads are not stalled. We always recommend not blocking in the first place in reactive code.