Getting Started
- concept
To get started with the Couchbase Spark connector quickly, learn how to add the connector to your Spark project and run simple queries.
Quickstart
The following examples use Scala and its sbt dependency manager, but it is possible to use it through Maven/Gradle as well and from Java.
Create a new sbt project and add the following content to the build.sbt
file.
This code includes all the Spark dependencies, as well as the Couchbase Spark connector.
Just enough to get you started!
Here is a reference to the Scala docs.
name := "my-first-couchbase-spark-project"
organization := "my.organization"
version := "1.0.0-SNAPSHOT"
scalaVersion := "2.13.14"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.5.1",
"org.apache.spark" %% "spark-sql" % "3.5.1",
"com.couchbase.client" %% "spark-connector" % "3.5.1"
)
Now, under src/main/scala/, create a Quickstart.scala
class with the following skeleton:
object Quickstart {
def main(args: Array[String]): Unit = {
}
}
If you are not familiar with Scala, this is the equivalent to Java’s public static void main(String[] args)
method. The following code goes inside the main
body.
When it comes to Spark, you always need to set up a configuration and initialize the SparkSession
.
The following snippet does that and also provides the necessary properties to connect to the Couchbase cluster. Please adjust the values to fit your environment.
// Configure the Spark Session
val spark = SparkSession
.builder()
.appName("Couchbase Quickstart")
.master("local") // use the JVM as the master, great for testing
.config("spark.couchbase.connectionString", "couchbases://cb.your-capella-hostname.couchbase.com")
.config("spark.couchbase.username", "username")
.config("spark.couchbase.password", "password")
.getOrCreate()
In this configuration, we are using the JVM-local spark master to run our jobs. We’re using a Couchbase Capella cluster (the Couchbase Spark Connector works against all varieties of Couchbase cluster), and we use the user username
with a password of password
.
Creating and saving RDDs
The fundamental data type in spark is the RDD (Resilient Distributed Dataset). The Couchbase Spark connector allows you to read and write RDDs against the cluster.
Add the following line to your code so that all couchbase-specific methods are available on the SparkContext
:
import com.couchbase.spark._
There are different APIs for each service (KeyValue, Query, etc) available, and one of the most used ones is fetching documents. This can be achieved by using couchbaseGet
:
// Imports needed, but let your IDE handle that!
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
// IDs of all documents we want to fetch
val ids = Seq(Get("airline_10"), Get("airline_10642"), Get("airline_10748"))
// The bucket from which we want to fetch the keys
val keyspace = Keyspace(bucket = Some("travel-sample"))
spark
// access the spark context
.sparkContext
// perform the get with ids and keyspace
.couchbaseGet(ids, keyspace)
// collect all spark results back to the master
.collect()
// print the content from each GetResult
.foreach(getResult => println(getResult.contentAs[JsonObject].get))
First, the IDs we want to fetch are specified (and wrapped in Get
case classes). Then, the Keyspace
tells the command where to look for the data. In this case, we tell it to use the travel-sample
bucket. Note that if you do not provide a scope or collection, the default ones are assumed. This way, the code is compatible with Couchbase Server versions before 7.0, but also with newer ones.
For each ID that has been found, a GetResult
is returned, which is the same one as that would get returned from the Scala SDK directly. In the example above, the content is printed and turned into a generic JsonObject
.
Spark by default is very verbose in its logging, but looking towards the end of the execution you should see logs similar to this:
{"country":"United States","iata":"Q5","name":"40-Mile Air","callsign":"MILE-AIR","icao":"MLA","id":10,"type":"airline"}
{"country":"United Kingdom","iata":null,"name":"Jc royal.britannica","callsign":null,"icao":"JRB","id":10642,"type":"airline"}
{"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}
The connector also allows to save RDDs as documents.
The following code loads documents from the travel-sample
bucket and then writes them into a different bucket. The same approach works for all kinds of RDDs, they could also come from different datasources.
// IDs of all documents we want to fetch
val ids = Seq(Get("airline_10"), Get("airline_10642"), Get("airline_10748"))
// The bucket from where and into where we want to read/write
val fromKeyspace = Keyspace(bucket = Some("travel-sample"))
val toKeyspace = Keyspace(bucket = Some("foo"))
spark
.sparkContext
// Retrieve the documents by ID
.couchbaseGet(ids, fromKeyspace)
// Turn the results into an "Upsert" case class...
.map(getResult => Upsert(getResult.id, getResult.contentAs[JsonObject].get))
// Which is accepted by the couchbaseUpsert method to store the documents
.couchbaseUpsert(toKeyspace)
.collect()
.foreach(println)
The println
this time will show the results of the mutations, returning the Scala SDK’s MutationResult
for each successful mutation.
Next up is an introduction into Query/Analytics and DataFrames.
Working with SparkSQL and DataFrames
DataFrames, in its essence, are RDDs with a Schema. They are represented in the SparkSQL Row
type.
You need to at least have a primary index created on the travel-sample bucket to make the following examples work.
If you haven’t done already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.
|
Because a DataFrame
is like an RDD
but with a schema and Couchbase is a schemaless database at its heart, you need a way to either define or infer a schema.
The connector has built-in schema inference, but if you have a large or diverse data set, you need to give it some clues on filtering (or use scopes and collections with Server 7.0 and later).
Suppose you want a DataFrame
for all airlines, and you know that the JSON content has a type
field with the value airline
.
You can pass this information to the connector for automatic schema inference:
// Create a DataFrame with Schema Inference
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Filter, "type = 'airline'")
.option(QueryOptions.Bucket, "travel-sample")
.load()
// Print The Schema
airlines.printSchema()
The code automatically infers the schema and prints it in this format:
root
|-- __META_ID: string (nullable = true)
|-- callsign: string (nullable = true)
|-- country: string (nullable = true)
|-- iata: string (nullable = true)
|-- icao: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- type: string (nullable = true)
If you are using collections, something similar can be achieved without having to add the filter (since the airline
collection only contains airlines in the first place):
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
Next you can perform an actual query where you are interested only in the name
and callsign
.
This example sorts it by the callsign
and loads only the first 10 rows.
// Used for the $-sign to name a column
import spark.sqlContext.implicits._
airlines
.select("name", "callsign")
.sort($"name".asc)
.show(10)
The code prints the results on the console like this:
+--------------------+-----------+
| name| callsign|
+--------------------+-----------+
| 40-Mile Air| MILE-AIR|
| AD Aviation| FLIGHTVUE|
| ATA Airlines| AMTRAN|
| Access Air| CYCLONE|
| Aigle Azur| AIGLE AZUR|
| Air Austral| REUNION|
|Air Caledonie Int...| AIRCALIN|
| Air Caraïbes|FRENCH WEST|
| Air Cargo Carriers|NIGHT CARGO|
| Air Cudlua| Cudlua|
+--------------------+-----------+
only showing top 10 rows
Working with Datasets
A Dataset is a distributed collection of data. Dataset
is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
The following example creates a Dataset out of a Dataframe and maps it to a case class. It then uses the case class to extract fields out of the result set in a typesafe way:
// Create a DataFrame with Schema Inference
val airlines = spark.read.format("couchbase.query")
//.option(QueryOptions.Filter, "type = 'airline'")
.option(QueryOptions.Bucket, "travel-sample")
.option(QueryOptions.Scope, "inventory")
.option(QueryOptions.Collection, "airline")
.load()
// Contains the Encoder
import spark.implicits._
// Create a Dataset from the DataFrame
val airlinesDS = airlines.as[Airline]
airlinesDS
.limit(10)
.collect()
.foreach(println)
The Airline
case class itself is defined outside of the main object:
case class Airline(name: String, country: String)
When run, this should print:
Airline(40-Mile Air,United States)
Airline(Texas Wings,United States)
Airline(Atifly,United States)
Airline(Jc royal.britannica,United Kingdom)
Airline(Locair,United States)
Airline(SeaPort Airlines,United States)
Airline(Alaska Central Express,United States)
Airline(Astraeus,United Kingdom)
Airline(Air Austral,France)
Airline(Airlinair,France)
Accessing the SDK directly
Most of the time accessing the SDK directly is not needed, but if lower level operations need to be performed which are not exposed through the connector it can be done.
To do this, a CouchbaseConnection
needs to be obtained and fed a CouchbaseConfig
. From there, the Cluster
, Bucket
etc. are available.
val config = CouchbaseConfig(spark.sparkContext.getConf)
val sdk = CouchbaseConnection()
val cluster = sdk.cluster(config)
val result = cluster.query("select 1=1")