Working With RDDs
- concept
Spark operates on resilient distributed datasets (RDDs). Higher level concepts like DataFrames and Datasets are more and more the primary means of access, but RDDs are still very useful to understand.
When you need to extract data out of Couchbase, the Couchbase Spark connector creates RDDs for you. In addition, you can also persist data to Couchbase using RDDs.
The following spark context is configured to work on the travel-sample
bucket and can be used to follow the examples. Please configure your connectionString, username and password accordingly.
val spark = SparkSession
.builder()
.master("local[*]")
.appName("WorkingWithRDDs")
.config("spark.couchbase.connectionString", "127.0.0.1")
.config("spark.couchbase.username", "Administrator")
.config("spark.couchbase.password", "password")
.config("spark.couchbase.implicitBucket", "travel-sample")
.getOrCreate()
All RDD operations operate on the SparkContext
, so the following import needs to be present before the APIs can be used:
import com.couchbase.spark._
Many arguments and return types are provided directly from the Couchbase Scala SDK (i.e. GetResult
and GetOptions
). This is by intention since it allows the most flexibility when interacting with the SDK. These types are not discussed in detail here, please refer to the official SDK documentation for more information.
Creating RDDs
The following read operations are available:
API | Description |
---|---|
|
Fetches full documents. |
|
Fetches parts of documents ("subdocument API"). |
|
Performs a N1QL query. |
|
Performs an analytics query. |
|
Performs a search query. |
Writing APIs are also available on the context:
API | Description |
---|---|
|
Stores documents with upsert semantics. |
|
Stores documents with replace semantics. |
|
Stores documents with insert semantics. |
|
Removes documents. |
|
Mutates parts of documents ("subdocument" API) |
|
Performs a N1QL query. |
Note that couchbaseQuery
is present twice, since you can execute DML statements through it as well as regular SELECTs.
The following example shows how to fetch two documents and prints their content:
spark
.sparkContext
.couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")))
.collect()
.foreach(result => println(result.contentAs[JsonObject]))
Each API takes a required Seq[T]
, where T
depends on the operation being used. The cases classes are named the same as the operation type and allow specifying more parameters than just the document ID where needed.
As an example, for a couchbaseReplace
the case class signature looks like this:
case class Replace[T](id: String, content: T, cas: Long = 0)
So for each entry in the Seq
, not only you can specify the id and content of the document, but also (optionally) the CAS value to perform an optimistic locking operation.
A N1QL query can be performed like this:
spark
.sparkContext
.couchbaseQuery[JsonObject]("select count(*) as count from `travel-sample`")
.collect()
.foreach(println)
In addition to the required parameter(s), optional information can also be passed along. Each operation allows to specify its equivalent option block (so for a couchbaseGet
the GetOptions
can be supplied). Also, a generic Keyspace
can be provided which allows to override the implicit defaults from the configuration.
A Keyspace looks like this:
case class Keyspace(
bucket: Option[String] = None,
scope: Option[String] = None,
collection: Option[String] = None
)
And you can use it to provide a custom bucket, scope or collection on a per-operation basis.
Persisting RDDs
While reading operations on the SparkContext
are common, writing documents to Couchbase at the RDD level usually operates on already existing RDDs.
The following functions are available on an RDD:
API | Type | Description |
---|---|---|
|
|
Stores documents with upsert semantics. |
|
|
Stores documents with replace semantics. |
|
|
Stores documents with insert semantics. |
|
|
Removes documents. |
|
|
Mutates parts of documents ("subdocument" API) |
It is important to understand that those APIs are only available if the RDD has the correct type. The following example illustrates this.
spark
.sparkContext
.couchbaseGet(Seq(Get("airline_10"), Get("airline_10642"), Get("airline_10748")))
.map(getResult => Upsert(getResult.id, getResult.contentAs[JsonObject].get))
.couchbaseUpsert(Keyspace(bucket = Some("targetBucket")))
.collect()
.foreach(println)
It first loads two documents from the travel-sample bucket and returns a RDD[GetResult]
. The objective is to store those two documents in the targetBucket
.
As a next step, inside the map function, a Upsert
case class is constructed which takes the document ID and content. This type is then passed to the couchbaseUpsert
function which executes the operation. Note that also a custom keyspace
is passed which overrides the default implicit one and therefore allows to write the data to a different bucket.
Note: If you are working with DataFrames and need to adjust the store semantics, see the section on SaveMode Mapping
in the Spark SQL section.