A newer version of this documentation is available.

View Latest

Spark SQL Integration

  • concept
    +
    Spark SQL allows accessing Query and Analytics in powerful and convenient ways.

    All query examples presented on this page at least require a primary index on the travel-sample data set - or on each collection respectively. If you haven’t done so already, you can create a primary index by executing this N1QL statement: CREATE PRIMARY INDEX ON `travel-sample`.

    To use the analytics examples, corresponding datasets or collection mappings should be created.

    DataFrame creation

    Before you can create a DataFrame with Couchbase, you need to create a SparkSession.

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL")
      .config("spark.couchbase.connectionString", "127.0.0.1")
      .config("spark.couchbase.username", "Administrator")
      .config("spark.couchbase.password", "password")
      .config("spark.couchbase.implicitBucket", "travel-sample")
      .getOrCreate()

    A DataFrame can be created through spark.read.format(…​), and which format to choose depends on the type of service you want to use. Both couchbase.query and couchbase.analytics are available.

    val queryDf = spark.read.format("couchbase.query").load()
    
    val analyticsDf = spark.read.format("couchbase.analytics").load()

    While this is the easiest, it has an important property to keep in mind:

    It will try to perform automatic schema inference based on the full data set, which is very likely to not hit the right schema (especially if you have a large or diverse data set) - unless you are using a specific collection for each document type.

    There are two options to change this: you can either provide a manual schema or narrow down the automatic schema inference by providing explicit predicates. The benefit of the latter approach is also that the predicate provided will be used on every query to optimize performance.

    If you want to get automatic schema inference on all airlines, you can specify it like this:

    val airlines = spark.read
      .format("couchbase.query")
      .option(QueryOptions.Filter, "type = 'airline'")
      .load()

    If you call airlines.printSchema(), it will print:

    root
     |-- __META_ID: string (nullable = true)
     |-- callsign: string (nullable = true)
     |-- country: string (nullable = true)
     |-- iata: string (nullable = true)
     |-- icao: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
     |-- type: string (nullable = true)

    Not only did it automatically infer the schema, it also added a META_ID field which corresponds to the document ID if applicable.

    A manual schema can also be provided if the automatic inference does not work properly:

    val airlines = spark.read
      .format("couchbase.query")
      .schema(StructType(
        StructField("name", StringType) ::
          StructField("type", StringType) :: Nil
      ))
      .load()

    Now that you have a DataFrame, you can apply all the operations that Spark SQL provides. A simple example would be to load specific fields from the DataFrame and print some of those records:

    airlines
      .select("name", "callsign")
      .sort(airlines("callsign").desc)
      .show(10)
    +-------+--------------------+
    |   name|            callsign|
    +-------+--------------------+
    |   EASY|             easyJet|
    |   BABY|             bmibaby|
    |MIDLAND|                 bmi|
    |   null|          Yellowtail|
    |   null|               XOJET|
    |STARWAY|   XL Airways France|
    |   XAIR|            XAIR USA|
    |  WORLD|       World Airways|
    |WESTERN|    Western Airlines|
    |   RUBY|Vision Airlines (V2)|
    +-------+--------------------+

    It is also possible to provide a custom schema as well as a predicate for maximum flexibility when describing your data layout as well as optimizing the performance when retrieving unstructured data.

    DataFrame persistence

    It is also possible to persist DataFrames into Couchbase. The important part is that a META_ID (or different if configured) field exists which can be mapped to the unique Document ID. All the other fields in the DataFrame will be converted into JSON and stored as the document content.

    You can store DataFrames using both couchbase.query and couchbase.kv. We recommend using the KeyValue data source since it provides better performance out of the box if the usage pattern allows for it.

    The following example reads data from a bucket and writes the first 5 results into a different bucket. Also, to showcase properties, they are used on both the read and the write side:

    val airlines = spark.read.format("couchbase.query")
      .option(QueryOptions.Filter, "type = 'airline'")
      .load()
      .limit(5)
    
    airlines.write.format("couchbase.kv")
      .option(KeyValueOptions.Bucket, "test")
      .option(KeyValueOptions.Durability, KeyValueOptions.MajorityDurability)
      .save()

    Working with Datasets

    You can call .as[Target] on your DataFrame to turn it into typesafe counterpart (most of the time a case class). Consider having the following case class:

    case class Airline(name: String, iata: String, icao: String, country: String)

    Make sure to import the implicits for the SparkSession:

    import spark.implicits._

    You can now create a DataFrame as usual which can be turned into a Dataset:

    val airlines = spark.read.format("couchbase.query")
      .option(QueryOptions.Filter, "type = 'airline'")
      .load()
      .as[Airline]

    If you want to print all Airlines that start with "A" you can access the properties on the case class:

    airlines
      .map(_.name)
      .filter(_.toLowerCase.startsWith("a"))
      .foreach(println(_))

    Fore more information on Datasets, please refer to the Spark Dataset Docs.