N1QL Queries from the SDK

You can query for documents in Couchbase using the N1QL query language, a language based on SQL, but designed for structured and flexible JSON documents.

These pages cover the sixth alpha release of the Couchbase Scala SDK — 1.0.0-alpha.6. As such they are likely to change without notice. This alpha code should not be used in production.

Documentation is incomplete, subject to change, and likely to contain broken links.

Our query service uses N1QL, which will be fairly familiar to anyone who’s used any dialect of SQL. Additional Resources for learning about N1QL are listed at the bottom of the page. Before you get started you may wish to checkout the N1QL intro page.

N1QL Compared to Key-Value

N1QL is excellent for performing queries against multiple documents, but if you only need to access or mutate a single document and you know its unique ID, it will be much more efficient to use the Key-Value API. We strongly recommend using both APIs to create a flexible, performant application.

Getting Started

Let’s get started by pulling in all the imports needed in the examples below:

import com.couchbase.client.scala._
import com.couchbase.client.scala.codec.Conversions.Codec
import com.couchbase.client.scala.implicits.Codecs
import com.couchbase.client.scala.json._
import com.couchbase.client.scala.query._
import reactor.core.scala.publisher._

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

Then we connect to a Couchbase cluster, as usual (of course, change the address and credentials to match your own cluster’s):

val cluster = Cluster.connect("localhost", "username", "password")
val bucket = cluster.bucket("travel-sample")

The examples below will use the travel-sample example bucket. This can be installed through the Couchbase Admin UI in Settings → Sample Buckets.

In order to be able to use query on a bucket, it must have at least a primary index created. The easiest way to create this is through the Couchbase Admin UI. Simply visit the Query tab then write this in the Query Editor and hit Execute:

CREATE PRIMARY INDEX ON `travel-sample`

A Simple Query

Here’s the basics of how to run a simple query to fetch 10 random rows from travel-sample and print the results:

val statement = """select * from `travel-sample` limit 10;"""
val result: Try[QueryResult] = cluster.query(statement)

(Note that we won’t be covering the N1QL language itself in any detail here, but if you’re familiar with SQL you’ll see it’s very similar.)

The Scala SDK returns Try rather than throwing exceptions, to allow you to handle errors in a functional way. A Try can either be a Success(QueryResult) if the N1QL statement was successfully executed, or Failure(Throwable) if something went wrong. It can be pattern matched on like this:

result match {
  case Success(result: QueryResult) =>
    result.allRowsAs[JsonObject] match {
      case Success(rows) =>
        println(s"Got ${rows} rows")
      case Failure(err) => println(s"Error: $err")
    }
  case Failure(err) => println(s"Error: $err")
}

The returned QueryResult contains an allRowsAs[T] method, allowing the results to be converted into something useful. The above example demonstrates returning the results as JsonObject, the JSON library built-in to the Scala SDK.

Other things allRowsAs can convert to are:

  • io.circe.Json from the popular Scala JSON library Circe

  • Similar support is included for other excellent JSON libraries: uPickle/uJson, Play Json, Jawn, and Json4s.

  • Scala case classes. A tiny amount of boilerplate is needed to support this: see Key-Value Operations for details.

  • String

  • Array[Byte]

Please see this guide for more information on the supported ways of working with JSON.

Of course, it wouldn’t be Scala if we couldn’t elegantly combine the operations above more concisely:

cluster.query("""select * from `travel-sample` limit 10;""")
  .flatMap(_.allRowsAs[JsonObject]) match {
  case Success(rows: Seq[JsonObject]) =>
    rows.foreach(row => println(row))
  case Failure(err) =>
    println(s"Error: $err")
}

Most of the examples here use the simplest of the three APIs provided by the Scala SDK, which blocks until the operation is performed. There’s also an asynchronous API that is based around Scala Future, and a streaming reactive API, for which we’ll see an example later.

Placeholder and Named Arguments

Placeholders allow you to specify variable constraints for a query.

There are two variants of placeholders: positional and named parameters. Both are used as placeholders for values in the WHERE, LIMIT or OFFSET clause of a query.

Positional parameters use an ordinal placeholder for substitution and can be used like this:

val stmt = """select * from `travel-sample` where type=$1 and country=$2 limit 10;"""
val result = cluster.query(stmt,
  QueryOptions().positionalParameters("airline", "United States"))

Whereas named parameters can be used like this:

val stmt = """select * from `travel-sample` where type=$type and country=$country limit 10;"""
val result = cluster.query(stmt,
  QueryOptions().namedParameters("type" -> "airline", "country" -> "United States"))

Scan Consistency

Queries take an optional scanConsistency parameter that enables a tradeoff between latency and (eventual) consistency.

  • A N1QL query using the default NotBounded scan consistency will not wait for any indexes to finish updating before running the query and returning results, meaning that results are returned quickly, but the query will not return any documents that are yet to be indexed.

  • With scan consistency set to RequestPlus, all outstanding document changes and index updates are processed before the query is run. Select this when consistency is always more important than performance.

  • For a middle ground, AtPlus is a "read your own write" (RYOW) option, which means it just waits for the documents that you specify to be indexed.

AtPlus is not supported in this initial alpha release, but will be available in the next release.

Here’s how to specify the RequestPlus scan consistency level:

val result = cluster.query("""select * from `travel-sample` limit 10;""",
  QueryOptions().scanConsistency(ScanConsistency.RequestPlus()))

Returning Results as Case Classes

The Scala SDK supports returning N1QL results directly as Scala case classes.

A small amount of boilerplate is required to tell the SDK how to convert your case class to/from JSON. There are more details available here, but the short version is to add a Codec in the case class’s companion object like this:

case class Address(line1: String)
case class User(name: String, age: Int, addresses: Seq[Address])
object User {
  implicit val codec: Codec[User] = Codecs.codec[User]
}

Now you’re free to pull out the results directly as your case class:

val statement = """select * from `travel-sample` limit 10;"""

cluster.query(statement)
  .flatMap(_.allRowsAs[User]) match {
  case Success(rows: Seq[User]) =>
    rows.foreach(row => println(row))
  case Failure(err) =>
    println(s"Error: $err")
}

Streaming Large Result Sets

As mentioned earlier, the Scala SDK provides three SDKs (documented further on howtos:multiple-apis.adoc):

  • The blocking API you’ve seen so far, that returns a QueryResult containing all rows.

  • An async API that returns a Future[QueryResult], which also contains all rows. This can be accessed like this:

// When we work with Scala Futures an ExecutionContext must be provided.
// For this example we'll just use the global default
import scala.concurrent.ExecutionContext.Implicits.global

val stmt = """select * from `travel-sample` limit 10;"""
val future: Future[QueryResult] = cluster.async.query(stmt)

future onComplete {
  case Success(result) =>
    result.allRowsAs[JsonObject] match {
      case Success(rows) => rows.foreach(println(_))
      case Failure(err) => println(s"Error: $err")
    }
  case Failure(err)    => println(s"Error: $err")
}
  • A reactive API, that can be used to stream rows.

The former two APIs buffer all rows in-memory until they can be returned to you. With smaller queries this is likely to be fine, but for large data sets this could lead to Java OutOfMemoryError exceptions.

The recommended solution is to use the reactive API. Reactive programming is a sophisticated paradigm that is rapidly gaining popularity for its ability to handle, amongst other things, streaming large amounts of data over fallible networks, while allowing error handling and backpressure.

The Scala SDK exposes primitives from the Project Reactor library, most notably Mono and Flux. We strongly recommend learning a little of this library first, and the following examples will assume basic familiarity with Reactor.

You’ll see both reactor.core.scala.publisher and reactor.core.publisher imports available for Reactor. Use the former, it is the Scala-optimized variant that the Scala SDK will return.

Here’s how to perform a query and stream the results using the reactive API:

val stmt = """select * from `travel-sample`;"""
val mono: Mono[ReactiveQueryResult] = cluster.reactive.query(stmt)

val rows: Flux[JsonObject] = mono
  // ReactiveQueryResult contains a rows: Flux[QueryRow]
  .flatMapMany(result => result.rowsAs[JsonObject])

// Just for example, block on the rows.  This is not best practice and apps
// should generally not block.
val allRows: Seq[JsonObject] = rows
  .doOnNext(row => println(row))
  .doOnError(err => println(s"Error: $err"))
  .collectSeq()
  .block()

Additional Resources

The N1QL interactive tutorial is a good introduction to the basics of N1QL use.