N1QL Queries from the SDK

    +
    You can query for documents in Couchbase using the N1QL query language, a language based on SQL, but designed for structured and flexible JSON documents. Querying can solve typical programming tasks such as finding a user profile by email address, facebook login, or user ID.

    Getting Started

    After familiarizing yourself with the basics on how the N1QL query language works and how to query it from the UI you can use it from the Java SDK.

    Before starting, here’s all imports used in the following examples:

    import com.couchbase.client.core.error.CouchbaseException;
    import com.couchbase.client.java.*;
    import com.couchbase.client.java.json.*;
    import com.couchbase.client.java.query.*;
    import org.reactivestreams.Subscription;
    import reactor.core.publisher.*;
    
    import java.util.UUID;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static com.couchbase.client.java.query.QueryOptions.queryOptions;

    Here’s a complete example of doing a query and handling the results:

    try {
      final QueryResult result = cluster
        .query("select * from `travel-sample` limit 10", queryOptions().metrics(true));
    
      for (JsonObject row : result.rowsAsObject()) {
        System.out.println("Found row: " + row);
      }
    
      System.out.println("Reported execution time: "
        + result.metaData().metrics().get().executionTime());
    } catch (CouchbaseException ex) {
      ex.printStackTrace();
    }

    Let’s break it down. A query is always performed at the Cluster level, using the query method. It takes the statement as a required argument and then allows to provide additional options if needed (in the example above, no options are specified).

    Once a result returns you can iterate the returned rows and/or accessing the QueryMetaData associated with the query. If something goes wrong during the execution of the query, a derivate of the CouchbaseException will be thrown that also provides additional context on the operation:

    Exception in thread "main" com.couchbase.client.core.error.ParsingFailureException: Parsing of the input failed {"completed":true,"coreId":1,"errors":[{"code":3000,"message":"syntax error - at end of input"}],"idempotent":false,"lastDispatchedFrom":"127.0.0.1:56279","lastDispatchedTo":"127.0.0.1:8093","requestId":3,"requestType":"QueryRequest","service":{"operationId":"eee9b796-bfff-42dc-941d-1a985e019ff8","statement":"select 1=","type":"query"},"timeoutMs":75000,"timings":{"dispatchMicros":14381,"totalMicros":1365348}}
    Open Buckets and Cluster-Level Queries

    If you are using a cluster older than Couchbase Server 6.5, it is required that there is at least one bucket open before performing a cluster-level query. If you fail to do so, the SDK will return a FeatureNotAvailableException with a descriptive error message asking you to open one.

    Paramerterized Queries

    Supplying parameters as individual arguments to the query allows the query engine to optimize the parsing and planning of the query. You can either supply these parameters by name or by position.

    The first example shows how to provide them by name:

    QueryResult result = cluster.query(
      "select count(*) from `travel-sample` where type = \"airport\" and country = $country",
      queryOptions().parameters(JsonObject.create().put("country", "France"))
    );

    The second example by position:

    QueryResult result = cluster.query(
      "select count(*) from `travel-sample` where type = \"airport\" and country = ?",
      queryOptions().parameters(JsonArray.from("France"))
    );

    What style you choose is up to you, for readability in more complex queries we generally recommend using the named parameters.

    Note that you cannot use parameters in all positions. If you put it in an unsupported place the server will respond with a PlanningFailureException or similar.

    The Query Result

    When performing a query, the response you receive is a QueryResult. If no exception gets raised the request succeeded and provides access to both the rows returned and also associated QueryMetaData.

    Rows can be consumed either through a JsonObject directly, turned into a java collection instance (like a Map) or into your POJO of choice mapping directly to your domain logic.

    QueryResult result = cluster.query(
      "select * from `travel-sample` limit 10"
    );
    for (JsonObject row : result.rowsAsObject()) {
      System.out.println("Found row: " + row);
    }

    The QueryMetaData provides insight into some basic profiling/timing information as well as information like the clientContextId.

    Table 1. QueryMetaData
    Name Description

    String requestId()

    Returns the request identifer of this request.

    String clientContextId()

    Returns the context ID either generated by the SDK or supplied by the user.

    QueryStatus status()

    An enum simply representing the state of the result.

    Optional<QueryMetrics> metrics()

    Returns metrics provided by the query for the request if enabled.

    Optional<JsonObject> signature()

    If a signature is present, it will be available to consume in a generic fashion.

    List<QueryWarning> warnings()

    Non-fatal errors are available to consume as warnings on this method.

    Optional<JsonObject> profile()

    If enabled returns additional profiling information of the query.

    For example, here is how you can print the executionTime of a query:

    QueryResult result = cluster.query("select 1=1", queryOptions().metrics(true));
    System.err.println(
      "Execution time: " + result.metaData().metrics().get().executionTime()
    );

    Query Options

    The query service provides an array of options to customize your query. The following table lists them all:

    Table 2. Available Query Options
    Name Description

    clientContextId(String)

    Sets a context ID returned by the service for debugging purposes.

    parameters(JsonArray)

    Allows to set positional arguments for a parameterized query.

    parameters(JsonObject)

    Allows to set named arguments for a parameterized query.

    priority(boolean)

    Assigns a different server-side priority to the query.

    raw(String, Object)

    Escape hatch to add arguments that are not covered by these options.

    readonly(boolean)

    Tells the client and server that this query is readonly.

    adhoc(boolean)

    If set to false will prepare the query and later execute the prepared statement.

    consistentWith(MutationState)

    Allows to be consistent with previously written mutations ("read your own writes").

    maxParallelism(int)

    Tunes the maximum parallelism on the server.

    metrics(boolean)

    Enables the server to send metrics back to the client as part of the response.

    pipelineBatch(int)

    Sets the batch size for the query pipeline.

    pipelineCap(int)

    Sets the cap for the query pipeline.

    profile(QueryProfile)

    Allows to enable additional query profiling as part of the response.

    scanWait(Duration)

    Allows to specify a maximum scan wait time.

    scanCap(int)

    Specifies a maximum cap on the query scan size.

    scanConsistency(QueryScanConsistency)

    Sets a different scan consistency for this query.

    serializer(JsonSerializer)

    Allows to use a different serializer for the decoding of the rows.

    Scan Consistency

    By default, the query engine will return whatever is currently in the index at the time of query (this mode is also called QueryScanConsistency.NOT_BOUNDED). If you need to include everything that has just been written, a different scan consistency must be chosen. If QueryScanConsistency.REQUEST_PLUS is chosen, it will likely take a bit longer to return the results but the query engine will make sure that it is as up-to-date as possible.

    QueryResult result = cluster.query(
      "select ...",
      queryOptions().scanConsistency(QueryScanConsistency.REQUEST_PLUS)
    );

    You can also use consistentWith(MutationState) for a more narrowed-down scan consistency. Construct the MutationState from individual `MutationToken`s that are returned from KV `MutationResult`s to make sure at least those mutations are visible. Depending on the index update rate this might provide a speedier response.

    Client Context Id

    The SDK will always send a client context ID with each query, even if none is provided by the user. By default a UUID will be generated that is mirrored back from the query engine and can be used for debugging purposes. A custom string can always be provided if you want to introduce application-specific semantics into it (so that for example in a network dump it shows up with a certain identifier). Whatever is chosen, we recommend making sure it is unique so different queries can be distinguished during debugging or monitoring.

    QueryResult result = cluster.query(
      "select ...",
      queryOptions().clientContextId("user-44" + UUID.randomUUID().toString())
    );

    Readonly

    If the query is marked as readonly, both the server and the SDK can improve processing of the operation. On the client side, the SDK can be more liberal with retries because it can be sure that there are no state-mutating side-effects happening. The query engine will ensure that actually no data is mutated when parsing and planning the query.

    QueryResult result = cluster.query(
      "select ...",
      queryOptions().readonly(true)
    );

    Custom JSON Serializer

    Like with all JSON apis, it is possible to customize the JSON serializer. It allows to plug in your own library (like GSON) or custom configured mappings on your own Jackson serializer. This in turn makes it possible to serialize rows into POJOs or other structures that your application defines and the SDK has no idea about.

    Please see the documentation on transcoding and serialization for more information.

    Reactive And Async APIs

    In addition to the blocking API on Cluster, the SDK provides reactive and async APIs on ReactiveCluster or AsyncCluster respectively. If you are in doubt of which API to use, we recommend looking at the reactive first. It builds on top of reactor, a powerful library that allows you to compose reactive computations and deal with error handling and other related concerns (like retry) in an elegant manner. The async API on the other hand exposes a CompletableFuture and is more meant for lower level integration into other libraries or if you need the last drop of performance.

    Also, there is another reason you want to use the reactive API: streaming large results with backpressure from the application side. Both the blocking and async APIs have no means of signalling backpressure in a good way, so if you need it the reactive API is your best option.

    Advanced Reactive Concepts Ahead

    Please see the guides on reactive programming for more information on the basics, this guide is diving straight into their impact on querying.

    A simple reactive query is similar to the blocking one:

    Mono<ReactiveQueryResult> result = cluster
      .reactive()
      .query("select 1=1");
    
    result
      .flatMapMany(ReactiveQueryResult::rowsAsObject)
      .subscribe(row -> System.out.println("Found row: " + row));

    This query will stream all rows as they become available form the server. If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit request() calls.

    Mono<ReactiveQueryResult> result = cluster
      .reactive()
      .query("select * from hugeBucket");
    
    result
      .flatMapMany(ReactiveQueryResult::rowsAsObject)
      .subscribe(new BaseSubscriber<JsonObject>() {
        // Number of outstanding requests
        final AtomicInteger oustanding = new AtomicInteger(0);
    
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
          request(10); // initially request to rows
          oustanding.set(10);
        }
    
        @Override
        protected void hookOnNext(JsonObject value) {
          process(value);
          if (oustanding.decrementAndGet() == 0) {
            request(10);
          }
        }
      });

    In this example we initially request a batch size of 10 rows (so streaming can begin). Then as each row gets streamed it is written to a process() method which does whatever it needs to do to process. Then a counter is decremented and once all of the 10 outstanding rows are processed another batch is loaded. Please note that if your process() method equivalent is blocking, like always with reactive code, you must move it onto another scheduler so that the I/O threads are not stalled. As always we recommend not blocking in the first place in reactive code.