DataFrames, Datasets, and SQL
- concept
Spark allows accessing query, analytics, Enterprise Analytics, and Capella Columnar in powerful and convenient ways.
While Apache’s low-level RDDs are powerful, Apache Spark now encourages users towards the higher-level DataFrames and Datasets. Let’s see how to use those with Couchbase.
Before you can create DataFrames or Datasets with Couchbase, you need to create a SparkSession
.
See Getting Started (Scala) or Getting Start(PySpark)
Running examples
All query examples presented on this page at least require the travel-sample
dataset to be installed on your Couchbase cluster, with a primary index.
If you haven’t done so already, you can create a primary index by executing this SQL++ statement: CREATE PRIMARY INDEX ON `travel-sample`
.
To use the operational analytics examples, corresponding datasets or collection mappings should be created.
Enterprise Analytics is also supported, and provides enhanced analytics capabilities for enterprise environments.
DataFrames
A read DataFrame
can be created through spark.read.format(…)
, and which format to choose depends on the type of service you want to use.
For spark.read
, couchbase.query
, couchbase.analytics
, couchbase.enterprise-analytics
, and couchbase.columnar
are available.
(For reading from the KV service - see RDDs)
-
Scala
-
PySpark
val queryDf = spark.read.format("couchbase.query").load()
val analyticsDf = spark.read.format("couchbase.analytics").load()
val enterpriseAnalyticsDf = spark.read.format("couchbase.enterprise-analytics").load()
val columnarDf = spark.read.format("couchbase.columnar").load()
queryDf = spark.read.format("couchbase.query").load()
analyticsDf = spark.read.format("couchbase.analytics").load()
enterpriseAnalyticsDf = spark.read.format("couchbase.enterprise-analytics").load()
columnarDf = spark.read.format("couchbase.columnar").load()
Collections
You’ll generally want the DataFrame to work with a specific Couchbase collection, which can be done with the following:
-
Scala
-
PySpark
-
Scala (Columnar)
-
Scala (Enterprise Analytics)
-
PySpark (Enterprise Analytics)
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
airlines = (spark.read.format("couchbase.query")
.option("bucket", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.load())
val airlines = spark.read.format("couchbase.columnar")
.option(ColumnarOptions.Database, "travel-sample")
.option(ColumnarOptions.Scope, "inventory")
.option(ColumnarOptions.Collection, "airline")
.load()
val airlines = spark.read.format("couchbase.enterprise-analytics")
.option(EnterpriseAnalyticsOptions.Database, "travel-sample")
.option(EnterpriseAnalyticsOptions.Scope, "inventory")
.option(EnterpriseAnalyticsOptions.Collection, "airline")
.load()
airlines = (spark.read.format("couchbase.enterprise-analytics")
.option("database", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.load())
Note all query options are documented here Query Options, all Capella Columnar options here Capella Columnar options, and all Enterprise Analytics options here Enterprise Analytics options.
If you will usually be using the same collection, it can be more convenient to provide it in the SparkSession config instead:
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.couchbase.connectionString", "couchbase://127.0.0.1")
.config("spark.couchbase.username", "username")
.config("spark.couchbase.password", "password")
.config("spark.couchbase.implicitBucket", "travel-sample")
.config("spark.couchbase.implicitScope", "inventory")
.config("spark.couchbase.implicitCollection", "airline")
.getOrCreate()
Schemas
Spark requires a schema for all DataFrame and Dataset operations.
The Couchbase Spark Connector will perform automatic schema inference based on a random sampling of all documents in the chosen collection — or bucket if no collection was chosen. This automatic inference can work well in cases where the data are very similar.
If the data are not similar, there are two options: you can either provide a manual schema, or narrow down what documents the automatic schema inference acts on by providing explicit predicates.
For example, say the schema of the airlines
collection evolves and we add a "version": 2
field to identify documents that have the new schema.
We can limit both the inference and the SQL++ executed from the DataFrame to only those documents, by providing a filter:
-
Scala (Columnar)
-
PySpark (Columnar)
-
Scala (Enterprise Analytics)
-
PySpark (Enterprise Analytics)
val airlines = spark.read
.format("couchbase.query")
.option(ColumnarOptions.Database, "travel-sample")
.option(ColumnarOptions.Scope, "inventory")
.option(ColumnarOptions.Collection, "airline")
.option(QueryOptions.Filter, "version = 2")
.load()
airlines = (spark.read
.format("couchbase.query")
.option("database", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.option("filter", "version = 2")
.load())
val airlines = spark.read
.format("couchbase.enterprise-analytics")
.option(EnterpriseAnalyticsOptions.Database, "travel-sample")
.option(EnterpriseAnalyticsOptions.Scope, "inventory")
.option(EnterpriseAnalyticsOptions.Collection, "airline")
.option(EnterpriseAnalyticsOptions.Filter, "country = 'United States'")
.load()
airlines = (spark.read
.format("couchbase.enterprise-analytics")
.option("database", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.option("filter", "country = 'United States'")
.load())
You can call airlines.printSchema()
to view the schema (either inferred or provided manually):
root |-- __META_ID: string (nullable = true) |-- callsign: string (nullable = true) |-- country: string (nullable = true) |-- iata: string (nullable = true) |-- icao: string (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) |-- type: string (nullable = true)
Not only did it automatically infer the schema, it also added a META_ID
field which corresponds to the document ID if applicable.
A manual schema can also be provided, and this is the recommended practice for production applications, to protect against future data changes:
-
Scala
val airlines = spark.read
.format("couchbase.query")
.schema(StructType(
StructField("name", StringType) ::
StructField("type", StringType) :: Nil
))
.load()
DataFrame Operations
Now that you have a DataFrame, you can apply all the operations that Spark SQL provides. A simple example would be to load specific fields from the DataFrame and print some of those records:
-
Scala
-
PySpark
airlines
.select("name", "callsign")
.sort(airlines("callsign").desc)
.show(10)
(airlines
.select("name", "callsign")
.sort(airlines("callsign").desc)
.show(10))
Results in:
+-------+--------------------+ | name| callsign| +-------+--------------------+ | EASY| easyJet| | BABY| bmibaby| |MIDLAND| bmi| | null| Yellowtail| | null| XOJET| |STARWAY| XL Airways France| | XAIR| XAIR USA| | WORLD| World Airways| |WESTERN| Western Airlines| | RUBY|Vision Airlines (V2)| +-------+--------------------+
Spark SQL
We can use Spark’s createOrReplaceTempView
to create a temporary view from a DataFrame, which we can then run Spark SQL on (which creates another DataFrame):
-
Scala
-
PySpark
airlines.createOrReplaceTempView("airlinesView")
val airlinesFromView = spark.sql("SELECT * FROM airlinesView")
airlines.createOrReplaceTempView("airlinesView")
airlinesFromView = spark.sql("SELECT * FROM airlinesView")
Note this SQL is executed purely within Spark, and is not sent to the Couchbase cluster.
DataFrame partitioning
By default, one DataFrame read or write with query will result in one SQL++ statement being executed.
With very large DataFrames this can present issues, as all rows are streamed back to one Spark worker. In addition there is no parallelism.
So in some specific situations the user may wish to provide partitioning hints so that multiple SQL++ statements are executed.
In this example, say we are reading from an orders collection, where we know we have around 100,000 orders and want to partition into 100 SQL++ statements. Here we will partition on a numerical "id" column:
-
Scala
-
PySpark
spark.read
.format("couchbase.query")
.option(QueryOptions.PartitionColumn, "id")
.option(QueryOptions.PartitionLowerBound, "1")
.option(QueryOptions.PartitionUpperBound, "100000")
.option(QueryOptions.PartitionCount, "100")
.load()
(spark.read
.format("couchbase.query")
.option("partitionColumn", "id")
.option("partitionLowerBound", "1")
.option("partitionUpperBound", "100000")
.option("partitionCount", "100")
.load())
This will result in 100 partitions with SQL++ statements along the lines of
SELECT [...] WHERE [...] AND id < 1000
SELECT [...] WHERE [...] AND (id >= 1000 AND id < 2000)
SELECT [...] WHERE [...] AND (id >= 2000 AND id < 3000)
...
SELECT [...] WHERE [...] AND id >= 99000
If any of the four partitioning options is provided, then all must be.
The chosen partitionColumn must support the SQL comparison operators "<" and ">=". Any results where partitionColumn is null or otherwise not matched by those operators, will not be included.
PartitionLowerBound
and PartitionUpperBound
do not bound or limit the results, they simply choose how many results are in each partition.
Note that the first and last queries in the example above use <
and >=
to include all results.
Partitioning performance tips
When using query partitioning, users are often interested in obtaining the highest throughput possible. Internal testing has found these tips will provide the best possible performance:
-
Duplicating or replicating the GSI indexes used by the query, so that multiple query nodes are not bottlenecked by going to a single GSI node.
-
Increasing the data service reader and writer threads.
-
Ensuring the disks used can provide sufficient throughput/IOPS.
-
Increasing the number of Couchbase nodes, and the resources (cores, memory, disk) allocated to each.
-
Increasing the count of Spark worker nodes, and the number of cores on each (each core can execute one partition at a time).
-
Ensuring sufficient network bandwidth is available.
DataFrame persistence
It is also possible to write DataFrames into Couchbase.
Couchbase documents require unique document ids, so this needs to be available in the DataFrame.
By default the connector will look for a META_ID
column, but this can be overridden with an option (see below).
All the other fields in the DataFrame will be converted into JSON and stored as the document content.
You can store DataFrames
using both couchbase.query
and couchbase.kv
.
We recommend using the KeyValue data source since it provides better performance out of the box if the usage pattern allows for it.
The following example reads data from a collection and writes the first 5 results into a different collection. Also, to showcase properties, they are used on both the read and the write side:
-
Scala
-
PySpark
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
.limit(5)
airlines.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
.save()
airlines = (spark.read.format("couchbase.query")
.option("bucket", "travel-sample")
.option("scope", "inventory")
.option("scope", "airline")
.load()
.limit(5))
(airlines.write.format("couchbase.kv")
.option("bucket", "test-bucket")
.option("scope", "test-scope")
.option("collection", "test-collection")
.save())
And to specify your own document id column for a DataFrame df
:
-
Scala
-
PySpark
df.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
.option(KeyValueOptions.IdFieldName, "YourIdColumn")
.save()
(df.write.format("couchbase.kv")
.option("bucket", "test-bucket")
.option("scope", "test-scope")
.option("collection", "test-collection")
.option("idFieldName", "YourIdColumn")
.save())
Note all key-value options are documented here Key-Value options and all query options are documented here Query Options.
Customising persistence
Spark has four built-in SaveModes that can be used to control how data is written into Couchbase.
As these Spark SaveModes do not cover all Couchbase functionality, the Couchbase Spark Connector additionally has a KeyValueOptions.WriteMode
option.
This example shows the two in use:
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
.limit(5)
// Writing using a built-in Spark SaveMode
airlines.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
.mode(SaveMode.Append)
.save()
// Writing using one of the Couchbase WriteModes
airlines.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
.option(KeyValueOptions.WriteMode, KeyValueOptions.WriteModeReplace)
.save()
The modes are mapped as follows:
SaveMode | Underlying Couchbase KV Operation | Behaviour |
---|---|---|
SaveMode.Overwrite |
Upsert |
Document will be created if it does not exist |
SaveMode.ErrorIfExists |
Insert |
Operations will fail with |
SaveMode.Ignore |
Insert |
If the document already exists, that operation will be skipped |
SaveMode.Append |
not supported |
|
WriteModeReplace |
Replace |
Operations will fail with |
WriteModeSubdocUpsert |
mutateIn (sub-document) with StoreSemantics.Upsert |
Document will be created if it does not exist |
WriteModeSubdocInsert |
mutateIn (sub-document) with StoreSemantics.Insert |
Operations will fail with |
WriteModeSubdocReplace |
mutateIn (sub-document) with StoreSemantics.Replace |
Operations will fail with |
SaveMode | Couchbase SQL++ |
---|---|
SaveMode.Overwrite |
UPSERT INTO |
SaveMode.ErrorIfExists |
INSERT INTO |
SaveMode.Ignore |
INSERT INTO (Ignores |
SaveMode.Append |
not supported |
Failure handling during persistence
The default is that the failure of any individual operation will fail the overall job.
For key-value DataFrame writes, this can be overridden with two error handling options.
KeyValueOptions.ErrorBucket
will cause any operation failure to write a document back into a Couchbase collection, containing information identifying the failing document and the failure reason.
This is the recommended approach.
dataFrame.write
.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "aBucket")
.option(KeyValueOptions.Scope, "aScope")
.option(KeyValueOptions.Collection, "aCollection")
// Specify a Couchbase collection to write errors to
.option(KeyValueOptions.ErrorBucket, "errorBucket")
.option(KeyValueOptions.ErrorScope, "errorScope")
.option(KeyValueOptions.ErrorCollection, "errorCollection")
.save()
KeyValueOptions.ErrorHandler
will call an error handling callback.
An example implementation of the interface is provided with the connector, and users are free to write their own also.
Refer to the Spark KeyValueOptions
API documentation for further information.
Safe DataFrame persistence
Couchbase supports optimistic concurrency via CAS for query DataFrame reads and key-value DataFrame writes, which can be used with the Couchbase Spark Connector as shown:
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
// Adds a field named "__META_CAS" to the DataFrame, with each document's CAS
.option(QueryOptions.OutputCas, "true")
.load()
.limit(5)
airlines.write.format("couchbase.kv")
.option(KeyValueOptions.Bucket, "test-bucket")
.option(KeyValueOptions.Scope, "test-scope")
.option(KeyValueOptions.Collection, "test-collection")
// Both enables CAS and specifies the field name to use (the constant here is "__META_CAS")
.option(KeyValueOptions.CasFieldName, DefaultConstants.DefaultCasFieldName)
// CAS is only supported with replace operations
.option(KeyValueOptions.WriteMode, KeyValueOptions.WriteModeReplace)
.save()
This protects against documents being modified between the read and write points.
Such modifications will cause the operation to fail with a CasMismatchException
.
This will by default also cause the job to fail, which can be configured with the error handling options detailed elsewhere.
Sub-document DataFrame KV persistence
Couchbase supports sub-document writes that update only specific parts of the document, and users writing large documents may find this provides a performance optimisation, or allows concurrent actors to modify different parts of the same documents. The Spark Connector exposes sub-document KV writes via both RDDs and DataFrames.
DataFrame persistence performance tips
We strongly recommend using the KeyValue data source for persisting DataFrames. This will automatically provide the best possible write performance.
But if the query data source is required, then we recommend:
-
Partition the data before writing it. (Each partition results in one INSERT or UPSERT SQL++ statement being executed by the connector.)
-
Observe the Spark worker logs to see how long each query is taking and if timeouts are occurring. Increase the partition count and/or the timeout value as needed.
-
The multiple large SQL++ statements that can be generated by the connector with large datasets may fill query service’s system:completed_requests table, leading to high query memory usage and potentially out-of-memory issues. Consider disabling or increasing the threshold for this, following these docs.
-
And standard recommendations for writing at scale: consider using separate nodes for query and KV; use disks that can support the throughput needed; use powerful enough nodes to support the throughput needed; ensure there is sufficient network bandwidth.
Datasets
You can call .as[Target]
on your DataFrame
to turn it into typesafe counterpart (most of the time a case class).
(Note that Apache Spark only supports Datasets in Scala - there is no PySpark equivalent.)
Consider having the following case class:
case class Airline(name: String, iata: String, icao: String, country: String)
Make sure to import the implicits for the SparkSession
:
import spark.implicits._
You can now create a DataFrame as usual which can be turned into a Dataset:
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
.as[Airline]
If you want to print all Airlines that start with "A" you can access the properties on the case class:
airlines
.map(_.name)
.filter(_.toLowerCase.startsWith("a"))
.foreach(println(_))
For more information on Datasets, please refer to the Spark Dataset Docs.
Options
Query options
The available options for query DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"bucket" |
QueryOptions.Bucket |
Name of a Couchbase bucket |
Mandatory unless implicitBucket is set |
"scope" |
QueryOptions.Scope |
Name of a Couchbase scope |
Defaults to the default scope on the specified bucket. Note implicitScope can be set in Spark config. |
"collection" |
QueryOptions.Collection |
Name of a Couchbase collection |
Defaults to the default collection on the specified scope. Note implicitCollection can be set in Spark config. |
"timeout" |
QueryOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
75 seconds |
"idFieldName" |
QueryOptions.IdFieldName |
Uses a different column when persisting a DataFrame. |
"META_ID" column |
"filter" |
QueryOptions.Filter |
A SQL expression that will be interjected directly into the generated SQL, e.g. |
None |
"partitionColumn" "partitionLowerBound" "partitionUpperBound" "partitionCount" |
QueryOptions.PartitionColumn QueryOptions.PartitionLowerBound QueryOptions.PartitionUpperBound QueryOptions.PartitionCount |
Control how manual DataFrame partitioning is performed. |
None |
"scanConsistency" |
QueryOptions.ScanConsistency |
Valid options are "requestPlus" / QueryOptions.RequestPlusScanConsistency and "notBounded" / QueryOptions.NotBoundedScanConsistency. |
"notBounded" |
"inferLimit" |
QueryOptions.InferLimit |
Controls how many documents are initially sampled to perform schema inference. |
1000 |
"connectionIdentifier" |
QueryOptions.ConnectionIdentifier |
Allows a different configuration to be used than the default one. See Connecting to multiple clusters. |
Default connection |
"outputCas" |
QueryOptions.OutputCas |
Adds a field to the DataFrame, containing each document’s CAS value for use in optimistic concurrency control. The field name is controlled by |
<does not output CAS> |
"casFieldName" |
QueryOptions.CasFieldName |
Controls the name of the field that is output by |
"__META_CAS" |
Enterprise Analytics options
The available options for Enterprise Analytics DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"database" |
EnterpriseAnalyticsOptions.Database |
Name of an Enterprise Analytics database |
Mandatory |
"scope" |
EnterpriseAnalyticsOptions.Scope |
Name of an Enterprise Analytics scope |
Mandatory |
"collection" |
EnterpriseAnalyticsOptions.Collection |
Name of an Enterprise Analytics collection |
Mandatory |
"timeout" |
EnterpriseAnalyticsOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
10 minutes |
"filter" |
EnterpriseAnalyticsOptions.Filter |
A SQL expression that will be interjected directly into the generated SQL, e.g. |
None |
"scanConsistency" |
EnterpriseAnalyticsOptions.ScanConsistency |
Valid options are "requestPlus" / EnterpriseAnalyticsOptions.RequestPlusScanConsistency and "notBounded" / EnterpriseAnalyticsOptions.NotBoundedScanConsistency. |
"notBounded" |
"inferLimit" |
EnterpriseAnalyticsOptions.InferLimit |
Controls how many documents are initially sampled to perform schema inference. |
1000 |
KeyValue options
The available options for query DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"bucket" |
KeyValueOptions.Bucket |
Name of a Couchbase bucket |
Mandatory unless implicitBucket is set |
"scope" |
KeyValueOptions.Scope |
Name of a Couchbase scope |
Defaults to the default scope on the specified bucket. Note implicitScope can be set in Spark config. |
"collection" |
KeyValueOptions.Collection |
Name of a Couchbase collection |
Defaults to the default collection on the specified scope. Note implicitCollection can be set in Spark config. |
"timeout" |
KeyValueOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
2.5 seconds for non-durable operations, 10 seconds for durable operations |
"idFieldName" |
KeyValueOptions.IdFieldName |
Uses a different column when persisting a DataFrame. |
"META_ID" column |
"durability" |
KeyValueOptions.Durability |
Valid options are "none", "majority" / KeyValueOptions.MajorityDurability, "majorityAndPersistToActive" / KeyValueOptions.MajorityAndPersistToActiveDurability, and "PersistToMajority" / KeyValueOptions.PersistToMajorityDurability. |
"none" |
"connectionIdentifier" |
KeyValueOptions.ConnectionIdentifier |
Allows a different configuration to be used than the default one. See Connecting to multiple clusters. |
Default connection |
"writeMode" |
KeyValueOptions.WriteMode |
Controls the write behavior, overriding the Spark SaveMode. The only valid option currently is KeyValueOptions.WriteModeReplace ("replace" for PySpark users). |
None (uses SaveMode) |
"casFieldName" |
KeyValueOptions.CasFieldName |
Both enables optimistic locking detection (CAS) for operations that support it, and specifies the field name containing CAS values. |
<CAS detection is disabled>> |
"errorBucket" |
KeyValueOptions.ErrorBucket |
Enable writing error documents to a given bucket if operations fail. |
None (operations fail the job) |
"errorScope" |
KeyValueOptions.ErrorScope |
Further customises KeyValueOptions.ErrorBucket to output to a particular scope. |
Default scope |
"errorCollection" |
KeyValueOptions.ErrorCollection |
Further customises KeyValueOptions.ErrorBucket to output to a particular collection. |
Default collection |
"errorHandler" |
KeyValueOptions.ErrorHandler |
Custom error handler class for handling operation failures. Must implement the appropriate interface. |
None (operations fail the job) |
Capella Columnar options
The available options for Capella Columnar DataFrame and Dataset operations:
PySpark | Scala & Java | Option | Default |
---|---|---|---|
"database" |
ColumnarOptions.Database |
Name of a Couchbase Columnar database |
Mandatory |
"scope" |
ColumnarOptions.Scope |
Name of a Couchbase Columnar scope |
Mandatory |
"collection" |
ColumnarOptions.Collection |
Name of a Couchbase Columnar collection |
Mandatory |
"timeout" |
ColumnarOptions.Timeout |
Overrides the default timeout. The string must be parsable from a Scala Duration. |
10 minutes |
"filter" |
ColumnarOptions.Filter |
A SQL expression that will be interjected directly into the generated SQL, e.g. |
None |
"scanConsistency" |
ColumnarOptions.ScanConsistency |
Valid options are "requestPlus" / ColumnarOptions.RequestPlusScanConsistency and "notBounded" / ColumnarOptions.NotBoundedScanConsistency. |
"notBounded" |
"inferLimit" |
ColumnarOptions.InferLimit |
Controls how many documents are initially sampled to perform schema inference. |
1000 |
"connectionIdentifier" |
ColumnarOptions.ConnectionIdentifier |
Allows a different configuration to be used than the default one. See Connecting to multiple clusters. |
Default connection |
Aggregate Push Down
The following predicates are pushed down to the query, operational analytics, Enterprise Analytics, and Capella Columnar engines if possible:
-
MIN(field)
-
MAX(field)
-
COUNT(field)
-
SUM(field)
-
COUNT(*)
They are supported both with and without grouping (GROUP BY
).
For performance reasons, this feature is enabled by default, unless using manual partitioning.
If for some reason it should be disabled, the PushDownAggregate
option can be used in which case Spark will handle the aggregations after receiving the results.