DataFrames, Datasets and SQL

  • concept
    +
    Spark allows accessing query, analytics, and Capella Columnar in powerful and convenient ways.

    While Apache’s low-level RDDs are powerful, Apache Spark now encourages users towards the higher-level DataFrames and Datasets. Let’s see how to use those with Couchbase.

    Before you can create DataFrames or Datasets with Couchbase, you need to create a SparkSession. See Getting Started (Scala) or Getting Start(PySpark)

    Running examples

    All query examples presented on this page at least require the travel-sample dataset to be installed on your Couchbase cluster, with a primary index. If you haven’t done so already, you can create a primary index by executing this SQL++ statement: CREATE PRIMARY INDEX ON `travel-sample`.

    To use the analytics examples, corresponding datasets or collection mappings should be created.

    Capella Columnar is also supported, and is documented separately.

    DataFrames

    A read DataFrame can be created through spark.read.format(…​), and which format to choose depends on the type of service you want to use. For spark.read, couchbase.query, couchbase.analytics, and couchbase.columnar are available.

    (For reading from the KV service - see RDDs)

    • Scala

    • PySpark

    val queryDf = spark.read.format("couchbase.query").load()
    
    val analyticsDf = spark.read.format("couchbase.analytics").load()
    
    val columnarDf = spark.read.format("couchbase.columnar").load()
    queryDf = spark.read.format("couchbase.query").load()
    
    analyticsDf = spark.read.format("couchbase.analytics").load()
    
    columnarDf = spark.read.format("couchbase.columnar").load()

    Collections

    You’ll generally want the DataFrame to work with a specific Couchbase collection, which can be done with the following:

    • Scala

    • PySpark

    • Scala (Columnar)

    val airlines = spark.read.format("couchbase.query")
      .option(QueryOptions.Bucket, "travel-sample")
      .option(QueryOptions.Scope, "inventory")
      .option(QueryOptions.Collection, "airline")
      .load()
    airlines = (spark.read.format("couchbase.query")
                .option("bucket", "travel-sample")
                .option("scope", "inventory")
                .option("collection", "airline")
                .load())
    val airlines = spark.read.format("couchbase.columnar")
      .option(ColumnarOptions.Database, "travel-sample")
      .option(ColumnarOptions.Scope, "inventory")
      .option(ColumnarOptions.Collection, "airline")
      .load()

    Note all query options are documented here Query Options and all Capella Columnar options here Capella Columnar options.

    If you will usually be using the same collection, it can be more convenient to provide it in the SparkSession config instead:

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .config("spark.couchbase.connectionString", "couchbase://127.0.0.1")
      .config("spark.couchbase.username", "username")
      .config("spark.couchbase.password", "password")
      .config("spark.couchbase.implicitBucket", "travel-sample")
      .config("spark.couchbase.implicitScope", "inventory")
      .config("spark.couchbase.implicitCollection", "airline")
      .getOrCreate()

    Schemas

    Spark requires a schema for all DataFrame and Dataset operations.

    The Couchbase Spark Connector will perform automatic schema inference based on a random sampling of all documents in the chosen collection — or bucket if no collection was chosen. This automatic inference can work well in cases where the data are very similar.

    If the data are not similar, there are two options: you can either provide a manual schema, or narrow down what documents the automatic schema inference acts on by providing explicit predicates.

    For example, say the schema of the airlines collection evolves and we add a "version": 2 field to identify documents that have the new schema. We can limit both the inference and the SQL++ executed from the DataFrame to only those documents, by providing a filter:

    • Scala (Columnar)

    • PySpark (Columnar)

    val airlines = spark.read
      .format("couchbase.query")
      .option(ColumnarOptions.Database, "travel-sample")
      .option(ColumnarOptions.Scope, "inventory")
      .option(ColumnarOptions.Collection, "airline")
      .option(QueryOptions.Filter, "version = 2")
      .load()
    airlines = (spark.read
                .format("couchbase.query")
                .option("database", "travel-sample")
                .option("scope", "inventory")
                .option("collection", "airline")
                .option("filter", "version = 2")
                .load())

    You can call airlines.printSchema() to view the schema (either inferred or provided manually):

    root
     |-- __META_ID: string (nullable = true)
     |-- callsign: string (nullable = true)
     |-- country: string (nullable = true)
     |-- iata: string (nullable = true)
     |-- icao: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
     |-- type: string (nullable = true)

    Not only did it automatically infer the schema, it also added a META_ID field which corresponds to the document ID if applicable.

    A manual schema can also be provided, and this is the recommended practice for production applications, to protect against future data changes:

    • Scala

    val airlines = spark.read
      .format("couchbase.query")
      .schema(StructType(
        StructField("name", StringType) ::
          StructField("type", StringType) :: Nil
      ))
      .load()

    DataFrame Operations

    Now that you have a DataFrame, you can apply all the operations that Spark SQL provides. A simple example would be to load specific fields from the DataFrame and print some of those records:

    • Scala

    • PySpark

    airlines
      .select("name", "callsign")
      .sort(airlines("callsign").desc)
      .show(10)
    (airlines
     .select("name", "callsign")
     .sort(airlines("callsign").desc)
     .show(10))

    Results in:

    +-------+--------------------+
    |   name|            callsign|
    +-------+--------------------+
    |   EASY|             easyJet|
    |   BABY|             bmibaby|
    |MIDLAND|                 bmi|
    |   null|          Yellowtail|
    |   null|               XOJET|
    |STARWAY|   XL Airways France|
    |   XAIR|            XAIR USA|
    |  WORLD|       World Airways|
    |WESTERN|    Western Airlines|
    |   RUBY|Vision Airlines (V2)|
    +-------+--------------------+

    Spark SQL

    We can use Spark’s createOrReplaceTempView to create a temporary view from a DataFrame, which we can then run Spark SQL on (which creates another DataFrame):

    • Scala

    • PySpark

    airlines.createOrReplaceTempView("airlinesView")
    val airlinesFromView = spark.sql("SELECT * FROM airlinesView")
    airlines.createOrReplaceTempView("airlinesView")
    airlinesFromView = spark.sql("SELECT * FROM airlinesView")

    Note this SQL is executed purely within Spark, and is not sent to the Couchbase cluster.

    DataFrame partitioning

    By default, one DataFrame read or write with query will result in one SQL++ statement being executed.

    With very large DataFrames this can present issues, as all rows are streamed back to one Spark worker. In addition there is no parallelism.

    So in some specific situations the user may wish to provide partitioning hints so that multiple SQL++ statements are executed.

    In this example, say we are reading from an orders collection, where we know we have around 100,000 orders and want to partition into 100 SQL++ statements. Here we will partition on a numerical "id" column:

    • Scala

    • PySpark

    spark.read
      .format("couchbase.query")
      .option(QueryOptions.PartitionColumn, "id")
      .option(QueryOptions.PartitionLowerBound, "1")
      .option(QueryOptions.PartitionUpperBound, "100000")
      .option(QueryOptions.PartitionCount, "100")
      .load()
    (spark.read
     .format("couchbase.query")
     .option("partitionColumn", "id")
     .option("partitionLowerBound", "1")
     .option("partitionUpperBound", "100000")
     .option("partitionCount", "100")
     .load())

    This will result in 100 partitions with SQL++ statements along the lines of

    SELECT [...] WHERE [...] AND id < 1000
    SELECT [...] WHERE [...] AND (id >= 1000 AND id < 2000)
    SELECT [...] WHERE [...] AND (id >= 2000 AND id < 3000)
    ...
    SELECT [...] WHERE [...] AND id >= 99000

    If any of the four partitioning options is provided, then all must be.

    The chosen partitionColumn must support the SQL comparison operators "<" and ">=". Any results where partitionColumn is null or otherwise not matched by those operators, will not be included.

    PartitionLowerBound and PartitionUpperBound do not bound or limit the results, they simply choose how many results are in each partition. Note that the first and last queries in the example above use < and >= to include all results.

    Partitioning performance tips

    When using query partitioning, users are often interested in obtaining the highest throughput possible. Internal testing has found these tips will provide the best possible performance:

    • Duplicating or replicating the GSI indexes used by the query, so that multiple query nodes are not bottlenecked by going to a single GSI node.

    • Increasing the data service reader and writer threads.

    • Ensuring the disks used can provide sufficient throughput/IOPS.

    • Increasing the number of Couchbase nodes, and the resources (cores, memory, disk) allocated to each.

    • Increasing the count of Spark worker nodes, and the number of cores on each (each core can execute one partition at a time).

    • Ensuring sufficient network bandwidth is available.

    DataFrame persistence

    It is also possible to persist DataFrames into Couchbase.

    Couchbase documents require unique document ids, so this needs to be available in the DataFrame. By default the connector will look for a META_ID column, but this can be overridden with an option (see below).

    All the other fields in the DataFrame will be converted into JSON and stored as the document content.

    You can store DataFrames using both couchbase.query and couchbase.kv. We recommend using the KeyValue data source since it provides better performance out of the box if the usage pattern allows for it.

    The following example reads data from a collection and writes the first 5 results into a different collection. Also, to showcase properties, they are used on both the read and the write side:

    • Scala

    • PySpark

    val airlines = spark.read.format("couchbase.query")
      .option(QueryOptions.Bucket, "travel-sample")
      .option(QueryOptions.Scope, "inventory")
      .option(QueryOptions.Collection, "airline")
      .load()
      .limit(5)
    
    airlines.write.format("couchbase.kv")
      .option(KeyValueOptions.Bucket, "test-bucket")
      .option(KeyValueOptions.Scope, "test-scope")
      .option(KeyValueOptions.Collection, "test-collection")
      .option(KeyValueOptions.Durability, KeyValueOptions.MajorityDurability)
      .save()
    airlines = (spark.read.format("couchbase.query")
                .option("bucket", "travel-sample")
                .option("scope", "inventory")
                .option("scope", "airline")
                .load()
                .limit(5))
    
    (airlines.write.format("couchbase.kv")
     .option("bucket", "test-bucket")
     .option("scope", "test-scope")
     .option("collection", "test-collection")
     .save())

    And to specify your own document id column for a DataFrame df:

    • Scala

    • PySpark

    df.write.format("couchbase.kv")
      .option(KeyValueOptions.Bucket, "test-bucket")
      .option(KeyValueOptions.Scope, "test-scope")
      .option(KeyValueOptions.Collection, "test-collection")
      .option(KeyValueOptions.IdFieldName, "YourIdColumn")
      .save()
    (df.write.format("couchbase.kv")
     .option("bucket", "test-bucket")
     .option("scope", "test-scope")
     .option("collection", "test-collection")
     .option("idFieldName", "YourIdColumn")
     .save())

    Note all key-value options are documented here Key-Value options and all query options are documented here Query Options.

    DataFrame persistence performance tips

    As mentioned, we strongly recommend using the KeyValue data source for persisting DataFrames.

    But if the query data source is required, then we recommend:

    • Partition the data before writing it. (Each partition results in one INSERT or UPSERT SQL++ statement being executed by the connector.)

    • Observe the Spark worker logs to see how long each query is taking and if timeouts are occurring. Increase the partition count and/or the timeout value as needed.

    • The multiple large SQL++ statements that can be generated by the connector with large datasets may fill query service’s system:completed_requests table, leading to high query memory usage and potentially out-of-memory issues. Consider disabling or increasing the threshold for this, following these docs.

    • And standard recommendations for writing at scale: consider using separate nodes for query and KV; use disks that can support the throughput needed; use powerful enough nodes to support the throughput needed; ensure there is sufficient network bandwidth.

    SaveMode Mapping

    SparkSQL DataFrames allow to configure how data is written to the data source by specifying a SaveMode, of which there are four: Append, Overwrite, ErrorIfExists, and Ignore.

    Couchbase has similar names for different write semantics: Insert, Upsert, and Replace. The following tables descripe the mappings for both couchbase.query and couchbase.kv:

    Note that SaveMode.Append is not supported, since the operations are always writing the full document body (and not appending to one).

    Table 1. couchbase.kv mappings
    SparkSQL Couchbase KV

    SaveMode.Overwrite

    Upsert

    SaveMode.ErrorIfExists

    Insert

    SaveMode.Ignore

    Insert (Ignores DocumentExistsException)

    SaveMode.Append

    not supported

    Table 2. couchbase.query mappings
    SparkSQL Couchbase SQL++

    SaveMode.Overwrite

    UPSERT INTO

    SaveMode.ErrorIfExists

    INSERT INTO

    SaveMode.Ignore

    INSERT INTO (Ignores DocumentExistsException)

    SaveMode.Append

    not supported

    Datasets

    You can call .as[Target] on your DataFrame to turn it into typesafe counterpart (most of the time a case class).

    (Note that Apache Spark only supports Datasets in Scala - there is no PySpark equivalent.)

    Consider having the following case class:

    case class Airline(name: String, iata: String, icao: String, country: String)

    Make sure to import the implicits for the SparkSession:

    import spark.implicits._

    You can now create a DataFrame as usual which can be turned into a Dataset:

    val airlines = spark.read.format("couchbase.query")
      .option(QueryOptions.Bucket, "travel-sample")
      .option(QueryOptions.Scope, "inventory")
      .option(QueryOptions.Collection, "airline")
      .load()
      .as[Airline]

    If you want to print all Airlines that start with "A" you can access the properties on the case class:

    airlines
      .map(_.name)
      .filter(_.toLowerCase.startsWith("a"))
      .foreach(println(_))

    For more information on Datasets, please refer to the Spark Dataset Docs.

    Options

    Query options

    The available options for query DataFrame and Dataset operations:

    Table 3. Query Options
    PySpark Scala & Java Option Default

    "bucket"

    QueryOptions.Bucket

    Name of a Couchbase bucket

    Mandatory unless implicitBucket is set

    "scope"

    QueryOptions.Scope

    Name of a Couchbase scope

    Defaults to the default scope on the specified bucket. Note implicitScope can be set in Spark config.

    "collection"

    QueryOptions.Collection

    Name of a Couchbase collection

    Defaults to the default collection on the specified scope. Note implicitCollection can be set in Spark config.

    "timeout"

    QueryOptions.Timeout

    Overrides the default timeout. The string must be parsable from a Scala Duration.

    75 seconds

    "idFieldName"

    QueryOptions.IdFieldName

    Uses a different column when persisting a DataFrame.

    "META_ID" column

    "filter"

    QueryOptions.Filter

    A SQL expression that will be interjected directly into the generated SQL, e.g. type = 'airport'.

    None

    "partitionColumn" "partitionLowerBound" "partitionUpperBound" "partitionCount"

    QueryOptions.PartitionColumn QueryOptions.PartitionLowerBound QueryOptions.PartitionUpperBound QueryOptions.PartitionCount

    Control how manual DataFrame partitioning is performed.

    None

    "scanConsistency"

    QueryOptions.ScanConsistency

    Valid options are "requestPlus" / QueryOptions.RequestPlusScanConsistency and "notBounded" / QueryOptions.NotBoundedScanConsistency.

    "notBounded"

    "inferLimit"

    QueryOptions.InferLimit

    Controls how many documents are initially sampled to perform schema inference.

    1000

    "connectionIdentifier"

    QueryOptions.ConnectionIdentifier

    Allows a different configuration to be used than the default one. See Connecting to multiple clusters.

    Default connection

    KeyValue options

    The available options for query DataFrame and Dataset operations:

    Table 4. KeyValue Options
    PySpark Scala & Java Option Default

    "bucket"

    KeyValueOptions.Bucket

    Name of a Couchbase bucket

    Mandatory unless implicitBucket is set

    "scope"

    KeyValueOptions.Scope

    Name of a Couchbase scope

    Defaults to the default scope on the specified bucket. Note implicitScope can be set in Spark config.

    "collection"

    KeyValueOptions.Collection

    Name of a Couchbase collection

    Defaults to the default collection on the specified scope. Note implicitCollection can be set in Spark config.

    "timeout"

    KeyValueOptions.Timeout

    Overrides the default timeout. The string must be parsable from a Scala Duration.

    2.5 seconds for non-durable operations, 10 seconds for durable operations

    "idFieldName"

    KeyValueOptions.IdFieldName

    Uses a different column when persisting a DataFrame.

    "META_ID" column

    "durability"

    KeyValueOptions.Durability

    Valid options are "none", "majority" / KeyValueOptions.MajorityDurability, "majorityAndPersistToActive" / KeyValueOptions.MajorityAndPersistToActiveDurability, and "PersistToMajority" / KeyValueOptions.PersistToMajorityDurability.

    "none"

    "connectionIdentifier"

    KeyValueOptions.ConnectionIdentifier

    Allows a different configuration to be used than the default one. See Connecting to multiple clusters.

    Default connection

    Capella Columnar options

    The available options for Capella Columnar DataFrame and Dataset operations:

    Table 5. Capella Columnar Options
    PySpark Scala & Java Option Default

    "database"

    ColumnarOptions.Database

    Name of a Couchbase Columnar database

    Mandatory

    "scope"

    ColumnarOptions.Scope

    Name of a Couchbase Columnar scope

    Mandatory

    "collection"

    ColumnarOptions.Collection

    Name of a Couchbase Columnar collection

    Mandatory

    "timeout"

    ColumnarOptions.Timeout

    Overrides the default timeout. The string must be parsable from a Scala Duration.

    10 minutes

    "filter"

    ColumnarOptions.Filter

    A SQL expression that will be interjected directly into the generated SQL, e.g. type = 'airport'.

    None

    "scanConsistency"

    ColumnarOptions.ScanConsistency

    Valid options are "requestPlus" / ColumnarOptions.RequestPlusScanConsistency and "notBounded" / ColumnarOptions.NotBoundedScanConsistency.

    "notBounded"

    "inferLimit"

    ColumnarOptions.InferLimit

    Controls how many documents are initially sampled to perform schema inference.

    1000

    "connectionIdentifier"

    ColumnarOptions.ConnectionIdentifier

    Allows a different configuration to be used than the default one. See Connecting to multiple clusters.

    Default connection

    Aggregate Push Down

    The following predicates are pushed down to the query, analytics, and Capella Columnar engines if possible:

    • MIN(field)

    • MAX(field)

    • COUNT(field)

    • SUM(field)

    • COUNT(*)

    They are supported both with and without grouping (GROUP BY).

    For performance reasons, this feature is enabled by default, unless using manual partitioning. If for some reason it should be disabled, the PushDownAggregate option can be used in which case Spark will handle the aggregations after receiving the results.