Working With RDDs

  • concept
    +
    Spark operates on resilient distributed datasets (RDDs). Higher level concepts like DataFrames and Datasets are more and more the primary means of access, but RDDs are still very useful to understand.

    When you need to extract data out of Couchbase, the Couchbase Spark connector creates RDDs for you. In addition, you can also persist data to Couchbase using RDDs.

    The following spark context is configured to work on the travel-sample bucket and can be used to follow the examples. Please configure your connectionString, username and password accordingly.

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("WorkingWithRDDs")
      .config("spark.couchbase.connectionString", "127.0.0.1")
      .config("spark.couchbase.username", "Administrator")
      .config("spark.couchbase.password", "password")
      .config("spark.couchbase.implicitBucket", "travel-sample")
      .getOrCreate()

    All RDD operations operate on the SparkContext, so the following import needs to be present before the APIs can be used:

    import com.couchbase.spark._

    Many arguments and return types are provided directly from the Couchbase Scala SDK (i.e. GetResult and GetOptions). This is by intention since it allows the most flexibility when interacting with the SDK. These types are not discussed in detail here, please refer to the official SDK documentation for more information.

    Creating RDDs

    The following read operations are available:

    API Description

    couchbaseGet

    Fetches full documents.

    couchbaseLookupIn

    Fetches parts of documents ("subdocument API").

    couchbaseQuery

    Performs a N1QL query.

    couchbaseAnalyticsQuery

    Performs an analytics query.

    couchbaseSearchQuery

    Performs a search query.

    Writing APIs are also available on the context:

    API Description

    couchbaseUpsert

    Stores documents with upsert semantics.

    couchbaseReplace

    Stores documents with replace semantics.

    couchbaseInsert

    Stores documents with insert semantics.

    couchbaseRemove

    Removes documents.

    couchbaseMutateIn

    Mutates parts of documents ("subdocument" API)

    couchbaseQuery

    Performs a N1QL query.

    Note that couchbaseQuery is present twice, since you can execute DML statements through it as well as regular SELECTs.

    The following example shows how to fetch two documents and prints their content:

    spark
      .sparkContext
      .couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")))
      .collect()
      .foreach(result => println(result.contentAs[JsonObject]))

    Each API takes a required Seq[T], where T depends on the operation being used. The cases classes are named the same as the operation type and allow specifying more parameters than just the document ID where needed.

    As an example, for a couchbaseReplace the case class signature looks like this:

    case class Replace[T](id: String, content: T, cas: Long = 0)

    So for each entry in the Seq, not only you can specify the id and content of the document, but also (optionally) the CAS value to perform an optimistic locking operation.

    A N1QL query can be performed like this:

    spark
      .sparkContext
      .couchbaseQuery[JsonObject]("select count(*) as count from `travel-sample`")
      .collect()
      .foreach(println)

    In addition to the required parameter(s), optional information can also be passed along. Each operation allows to specify its equivalent option block (so for a couchbaseGet the GetOptions can be supplied). Also, a generic Keyspace can be provided which allows to override the implicit defaults from the configuration.

    A Keyspace looks like this:

    case class Keyspace(
      bucket: Option[String] = None,
      scope: Option[String] = None,
      collection: Option[String] = None
    )

    And you can use it to provide a custom bucket, scope or collection on a per-operation basis.

    Persisting RDDs

    While reading operations on the SparkContext are common, writing documents to Couchbase at the RDD level usually operates on already existing RDDs.

    The following functions are available on an RDD:

    API Type Description

    couchbaseUpsert

    RDD[Upsert[_]]

    Stores documents with upsert semantics.

    couchbaseReplace

    RDD[Replace[_]]

    Stores documents with replace semantics.

    couchbaseInsert

    RDD[Insert[_]]

    Stores documents with insert semantics.

    couchbaseRemove

    RDD[Remove]

    Removes documents.

    couchbaseMutateIn

    RDD[MutateIn]

    Mutates parts of documents ("subdocument" API)

    It is important to understand that those APIs are only available if the RDD has the correct type. The following example illustrates this.

    spark
      .sparkContext
      .couchbaseGet(Seq(Get("airline_10"), Get("airline_10642"), Get("airline_10748")))
      .map(getResult => Upsert(getResult.id, getResult.contentAs[JsonObject].get))
      .couchbaseUpsert(Keyspace(bucket = Some("targetBucket")))
      .collect()
      .foreach(println)

    It first loads two documents from the travel-sample bucket and returns a RDD[GetResult]. The objective is to store those two documents in the targetBucket.

    As a next step, inside the map function, a Upsert case class is constructed which takes the document ID and content. This type is then passed to the couchbaseUpsert function which executes the operation. Note that also a custom keyspace is passed which overrides the default implicit one and therefore allows to write the data to a different bucket.

    Note: If you are working with DataFrames and need to adjust the store semantics, see the section on SaveMode Mapping in the Spark SQL section.