DataFrames, Datasets and SQL
- concept
Spark allows accessing query, analytics, and Capella Columnar in powerful and convenient ways.
While Apache’s low-level RDDs are powerful, Apache Spark now encourages users towards the higher-level DataFrames and Datasets. Let’s see how to use those with Couchbase.
Before you can create DataFrames or Datasets with Couchbase, you need to create a SparkSession
.
See Getting Started (Scala) or Getting Start(PySpark)
Running examples
All query examples presented on this page at least require the travel-sample
dataset to be installed on your Couchbase cluster, with a primary index.
If you haven’t done so already, you can create a primary index by executing this SQL++ statement: CREATE PRIMARY INDEX ON `travel-sample`
.
To use the analytics examples, corresponding datasets or collection mappings should be created.
Capella Columnar is also supported, and is documented separately.
DataFrames
A read DataFrame
can be created through spark.read.format(…)
, and which format to choose depends on the type of service you want to use.
For spark.read
, couchbase.query
, couchbase.analytics
, and couchbase.columnar
are available.
(For reading from the KV service - see RDDs)
-
Scala
-
PySpark
val queryDf = spark.read.format("couchbase.query").load()
val analyticsDf = spark.read.format("couchbase.analytics").load()
val columnarDf = spark.read.format("couchbase.columnar").load()
queryDf = spark.read.format("couchbase.query").load()
analyticsDf = spark.read.format("couchbase.analytics").load()
columnarDf = spark.read.format("couchbase.columnar").load()
Collections
You’ll generally want the DataFrame to work with a specific Couchbase collection, which can be done with the following:
-
Scala
-
PySpark
-
Scala (Columnar)
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
airlines = (spark.read.format("couchbase.query")
.option("bucket", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.load())
val airlines = spark.read.format("couchbase.columnar")
.option(ColumnarOptions.Database, "travel-sample")
.option(ColumnarOptions.Scope, "inventory")
.option(ColumnarOptions.Collection, "airline")
.load()
Note all query options are documented here Query Options and all Capella Columnar options here Capella Columnar options.
If you will usually be using the same collection, it can be more convenient to provide it in the SparkSession config instead:
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.couchbase.connectionString", "couchbase://127.0.0.1")
.config("spark.couchbase.username", "username")
.config("spark.couchbase.password", "password")
.config("spark.couchbase.implicitBucket", "travel-sample")
.config("spark.couchbase.implicitScope", "inventory")
.config("spark.couchbase.implicitCollection", "airline")
.getOrCreate()
Schemas
Spark requires a schema for all DataFrame and Dataset operations.
The Couchbase Spark Connector will perform automatic schema inference based on a random sampling of all documents in the chosen collection — or bucket if no collection was chosen. This automatic inference can work well in cases where the data are very similar.
If the data are not similar, there are two options: you can either provide a manual schema, or narrow down what documents the automatic schema inference acts on by providing explicit predicates.
For example, say the schema of the airlines
collection evolves and we add a "version": 2
field to identify documents that have the new schema.
We can limit both the inference and the SQL++ executed from the DataFrame to only those documents, by providing a filter:
-
Scala (Columnar)
-
PySpark (Columnar)
val airlines = spark.read
.format("couchbase.query")
.option(ColumnarOptions.Database, "travel-sample")
.option(ColumnarOptions.Scope, "inventory")
.option(ColumnarOptions.Collection, "airline")
.option(QueryOptions.Filter, "version = 2")
.load()
airlines = (spark.read
.format("couchbase.query")
.option("database", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.option("filter", "version = 2")
.load())
You can call airlines.printSchema()
to view the schema (either inferred or provided manually):
root |-- __META_ID: string (nullable = true) |-- callsign: string (nullable = true) |-- country: string (nullable = true) |-- iata: string (nullable = true) |-- icao: string (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) |-- type: string (nullable = true)
Not only did it automatically infer the schema, it also added a META_ID
field which corresponds to the document ID if applicable.
A manual schema can also be provided, and this is the recommended practice for production applications, to protect against future data changes:
-
Scala
val airlines = spark.read
.format("couchbase.query")
.schema(StructType(
StructField("name", StringType) ::
StructField("type", StringType) :: Nil
))
.load()
DataFrame Operations
Now that you have a DataFrame, you can apply all the operations that Spark SQL provides. A simple example would be to load specific fields from the DataFrame and print some of those records:
-
Scala
-
PySpark
airlines
.select("name", "callsign")
.sort(airlines("callsign").desc)
.show(10)
(airlines
.select("name", "callsign")
.sort(airlines("callsign").desc)
.show(10))
Results in:
+-------+--------------------+ | name| callsign| +-------+--------------------+ | EASY| easyJet| | BABY| bmibaby| |MIDLAND| bmi| | null| Yellowtail| | null| XOJET| |STARWAY| XL Airways France| | XAIR| XAIR USA| | WORLD| World Airways| |WESTERN| Western Airlines| | RUBY|Vision Airlines (V2)| +-------+--------------------+
Spark SQL
We can use Spark’s createOrReplaceTempView
to create a temporary view from a DataFrame, which we can then run Spark SQL on (which creates another DataFrame):
-
Scala
-
PySpark
airlines.createOrReplaceTempView("airlinesView")
val airlinesFromView = spark.sql("SELECT * FROM airlinesView")
airlines.createOrReplaceTempView("airlinesView")
airlinesFromView = spark.sql("SELECT * FROM airlinesView")
Note this SQL is executed purely within Spark, and is not sent to the Couchbase cluster.
DataFrame partitioning
By default, one DataFrame read or write with query will result in one SQL++ statement being executed.
With very large DataFrames this can present issues, as all rows are streamed back to one Spark worker. In addition there is no parallelism.
So in some specific situations the user may wish to provide partitioning hints so that multiple SQL++ statements are executed.
In this example, say we are reading from an orders collection, where we know we have around 100,000 orders and want to partition into 100 SQL++ statements. Here we will partition on a numerical "id" column:
-
Scala
-
PySpark
spark.read
.format("couchbase.query")
.option(QueryOptions.PartitionColumn, "id")
.option(QueryOptions.PartitionLowerBound, "1")
.option(QueryOptions.PartitionUpperBound, "100000")
.option(QueryOptions.PartitionCount, "100")
.load()
(spark.read
.format("couchbase.query")
.option("partitionColumn", "id")
.option("partitionLowerBound", "1")
.option("partitionUpperBound", "100000")
.option("partitionCount", "100")
.load())
This will result in 100 partitions with SQL++ statements along the lines of
SELECT [...] WHERE [...] AND id < 1000
SELECT [...] WHERE [...] AND (id >= 1000 AND id < 2000)
SELECT [...] WHERE [...] AND (id >= 2000 AND id < 3000)
...
SELECT [...] WHERE [...] AND id >= 99000
If any of the four partitioning options is provided, then all must be.
The chosen partitionColumn must support the SQL comparison operators "<" and ">=". Any results where partitionColumn is null or otherwise not matched by those operators, will not be included.
PartitionLowerBound
and PartitionUpperBound
do not bound or limit the results, they simply choose how many results are in each partition.
Note that the first and last queries in the example above use <
and >=
to include all results.
Partitioning performance tips
When using query partitioning, users are often interested in obtaining the highest throughput possible. Internal testing has found these tips will provide the best possible performance:
-
Duplicating or replicating the GSI indexes used by the query, so that multiple query nodes are not bottlenecked by going to a single GSI node.
-
Increasing the data service reader and writer threads.
-
Ensuring the disks used can provide sufficient throughput/IOPS.
-
Increasing the number of Couchbase nodes, and the resources (cores, memory, disk) allocated to each.
-
Increasing the count of Spark worker nodes, and the number of cores on each (each core can execute one partition at a time).
-
Ensuring sufficient network bandwidth is available.
DataFrame persistence
It is also possible to persist DataFrames into Couchbase.
Couchbase documents require unique document ids, so this needs to be available in the DataFrame.
By default the connector will look for a META_ID
column, but this can be overridden with an option (see below).
All the other fields in the DataFrame will be converted into JSON and stored as the document content.
You can store DataFrames
using both couchbase.query
and couchbase.kv
.
We recommend using the KeyValue data source since it provides better performance out of the box if the usage pattern allows for it.
The following example reads data from a collection and writes the first 5 results into a different collection. Also, to showcase properties, they are used on both the read and the write side:
-
Scala
-
PySpark
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
.limit(5)
airlines.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
.option(KeyValueOptions.Durability, KeyValueOptions.MajorityDurability)
.save()
airlines = (spark.read.format("couchbase.query")
.option("bucket", "travel-sample")
.option("scope", "inventory")
.option("scope", "airline")
.load()
.limit(5))
(airlines.write.format("couchbase.kv")
.option("bucket", "test-bucket")
.option("scope", "test-scope")
.option("collection", "test-collection")
.save())
And to specify your own document id column for a DataFrame df
:
-
Scala
-
PySpark
df.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
.option(KeyValueOptions.IdFieldName, "YourIdColumn")
.save()
(df.write.format("couchbase.kv")
.option("bucket", "test-bucket")
.option("scope", "test-scope")
.option("collection", "test-collection")
.option("idFieldName", "YourIdColumn")
.save())
Note all key-value options are documented here Key-Value options and all query options are documented here Query Options.
DataFrame persistence performance tips
As mentioned, we strongly recommend using the KeyValue data source for persisting DataFrames.
But if the query data source is required, then we recommend:
-
Partition the data before writing it. (Each partition results in one INSERT or UPSERT SQL++ statement being executed by the connector.)
-
Observe the Spark worker logs to see how long each query is taking and if timeouts are occurring. Increase the partition count and/or the timeout value as needed.
-
The multiple large SQL++ statements that can be generated by the connector with large datasets may fill query service’s system:completed_requests table, leading to high query memory usage and potentially out-of-memory issues. Consider disabling or increasing the threshold for this, following these docs.
-
And standard recommendations for writing at scale: consider using separate nodes for query and KV; use disks that can support the throughput needed; use powerful enough nodes to support the throughput needed; ensure there is sufficient network bandwidth.
SaveMode Mapping
SparkSQL DataFrames allow to configure how data is written to the data source by specifying a SaveMode,
of which there are four: Append
, Overwrite
, ErrorIfExists
, and Ignore
.
Couchbase has similar names for different write semantics: Insert
, Upsert
, and Replace
.
The following tables descripe the mappings for both couchbase.query
and couchbase.kv
:
Note that SaveMode.Append
is not supported, since the operations are always writing the full document body (and not appending to one).
SparkSQL | Couchbase KV |
---|---|
SaveMode.Overwrite |
Upsert |
SaveMode.ErrorIfExists |
Insert |
SaveMode.Ignore |
Insert (Ignores |
SaveMode.Append |
not supported |
SparkSQL | Couchbase SQL++ |
---|---|
SaveMode.Overwrite |
UPSERT INTO |
SaveMode.ErrorIfExists |
INSERT INTO |
SaveMode.Ignore |
INSERT INTO (Ignores |
SaveMode.Append |
not supported |
Datasets
You can call .as[Target]
on your DataFrame
to turn it into typesafe counterpart (most of the time a case class).
(Note that Apache Spark only supports Datasets in Scala - there is no PySpark equivalent.)
Consider having the following case class:
case class Airline(name: String, iata: String, icao: String, country: String)
Make sure to import the implicits for the SparkSession
:
import spark.implicits._
You can now create a DataFrame as usual which can be turned into a Dataset:
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
.as[Airline]
If you want to print all Airlines that start with "A" you can access the properties on the case class:
airlines
.map(_.name)
.filter(_.toLowerCase.startsWith("a"))
.foreach(println(_))
For more information on Datasets, please refer to the Spark Dataset Docs.
Options
Query options
The available options for query DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"bucket" |
QueryOptions.Bucket |
Name of a Couchbase bucket |
Mandatory unless implicitBucket is set |
"scope" |
QueryOptions.Scope |
Name of a Couchbase scope |
Defaults to the default scope on the specified bucket. Note implicitScope can be set in Spark config. |
"collection" |
QueryOptions.Collection |
Name of a Couchbase collection |
Defaults to the default collection on the specified scope. Note implicitCollection can be set in Spark config. |
"timeout" |
QueryOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
75 seconds |
"idFieldName" |
QueryOptions.IdFieldName |
Uses a different column when persisting a DataFrame. |
"META_ID" column |
"filter" |
QueryOptions.Filter |
A SQL expression that will be interjected directly into the generated SQL, e.g. |
None |
"partitionColumn" "partitionLowerBound" "partitionUpperBound" "partitionCount" |
QueryOptions.PartitionColumn QueryOptions.PartitionLowerBound QueryOptions.PartitionUpperBound QueryOptions.PartitionCount |
Control how manual DataFrame partitioning is performed. |
None |
"scanConsistency" |
QueryOptions.ScanConsistency |
Valid options are "requestPlus" / QueryOptions.RequestPlusScanConsistency and "notBounded" / QueryOptions.NotBoundedScanConsistency. |
"notBounded" |
"inferLimit" |
QueryOptions.InferLimit |
Controls how many documents are initially sampled to perform schema inference. |
1000 |
"connectionIdentifier" |
QueryOptions.ConnectionIdentifier |
Allows a different configuration to be used than the default one. See Connecting to multiple clusters. |
Default connection |
KeyValue options
The available options for query DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"bucket" |
KeyValueOptions.Bucket |
Name of a Couchbase bucket |
Mandatory unless implicitBucket is set |
"scope" |
KeyValueOptions.Scope |
Name of a Couchbase scope |
Defaults to the default scope on the specified bucket. Note implicitScope can be set in Spark config. |
"collection" |
KeyValueOptions.Collection |
Name of a Couchbase collection |
Defaults to the default collection on the specified scope. Note implicitCollection can be set in Spark config. |
"timeout" |
KeyValueOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
2.5 seconds for non-durable operations, 10 seconds for durable operations |
"idFieldName" |
KeyValueOptions.IdFieldName |
Uses a different column when persisting a DataFrame. |
"META_ID" column |
"durability" |
KeyValueOptions.Durability |
Valid options are "none", "majority" / KeyValueOptions.MajorityDurability, "majorityAndPersistToActive" / KeyValueOptions.MajorityAndPersistToActiveDurability, and "PersistToMajority" / KeyValueOptions.PersistToMajorityDurability. |
"none" |
"connectionIdentifier" |
KeyValueOptions.ConnectionIdentifier |
Allows a different configuration to be used than the default one. See Connecting to multiple clusters. |
Default connection |
Capella Columnar options
The available options for Capella Columnar DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"database" |
ColumnarOptions.Database |
Name of a Couchbase Columnar database |
Mandatory |
"scope" |
ColumnarOptions.Scope |
Name of a Couchbase Columnar scope |
Mandatory |
"collection" |
ColumnarOptions.Collection |
Name of a Couchbase Columnar collection |
Mandatory |
"timeout" |
ColumnarOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
10 minutes |
"filter" |
ColumnarOptions.Filter |
A SQL expression that will be interjected directly into the generated SQL, e.g. |
None |
"scanConsistency" |
ColumnarOptions.ScanConsistency |
Valid options are "requestPlus" / ColumnarOptions.RequestPlusScanConsistency and "notBounded" / ColumnarOptions.NotBoundedScanConsistency. |
"notBounded" |
"inferLimit" |
ColumnarOptions.InferLimit |
Controls how many documents are initially sampled to perform schema inference. |
1000 |
"connectionIdentifier" |
ColumnarOptions.ConnectionIdentifier |
Allows a different configuration to be used than the default one. See Connecting to multiple clusters. |
Default connection |
Aggregate Push Down
The following predicates are pushed down to the query, analytics, and Capella Columnar engines if possible:
-
MIN(field)
-
MAX(field)
-
COUNT(field)
-
SUM(field)
-
COUNT(*)
They are supported both with and without grouping (GROUP BY
).
For performance reasons, this feature is enabled by default, unless using manual partitioning.
If for some reason it should be disabled, the PushDownAggregate
option can be used in which case Spark will handle the aggregations after receiving the results.