Capella Columnar Support
- Developer Preview
Connecting to Capella Columnar is very similar to connecting to any Couchbase cluster. This section explains how.
Capella Columnar clusters can be connected to using either Scala or PySpark. To get bootstrapped, users should view the Scala getting started or PySpark getting started guides.
Developer Preview mode ("Preview Mode"), provides early access to features which may become generally available ("GA") in future releases and enables you to play with these features to get a sense of how they work. Preview Mode features and their use are subject to Couchbase’s "Non-GA Offering Supplemental Terms", set forth in the License Agreement. Preview Mode features may not be functionally complete and are not intended for production use. They are intended for development and testing purposes only. |
Spark Configuration
The first step as usual is to create a SparkSession
, here connecting to your Capella Columnar cluster.
This works just like connecting to any other type of Couchbase cluster.
- Scala
-
Change the cluster configuration options to match your own.
val spark = SparkSession
.builder()
.appName("Couchbase Spark Connector Columnar Example") // your app name
.master("local[*]") // your local or remote Spark master node
.config("spark.couchbase.connectionString", "couchbases://your-columnar-endpoint.cloud.couchbase.com")
.config("spark.couchbase.username", "username")
.config("spark.couchbase.password", "password")
.getOrCreate()
- PySpark
-
Change the cluster configuration options to match your own, and provide the location of the Spark Connector library if needed (see the PySpark getting started guide for more details).
spark = (SparkSession.builder
.appName("Couchbase Spark Connector Columnar Example")
# Note whether you need the .master(...) and .config("spark.jars"...) lines depends on how you are using Spark.
# See our PySpark documentation for more details.
.master("local[*]")
.config("spark.jars", "/path/to/spark-connector-assembly-<version>.jar")
.config("spark.couchbase.connectionString", "couchbases://cb.your.columnar.connection.string.com")
.config("spark.couchbase.username", "YourColumnarUsername")
.config("spark.couchbase.password", "YourColumnarPassword")
.getOrCreate())
The following examples will use the travel-sample
example set of data, which can be loaded through the UI.
Reading a Dataframe
Let’s start by reading a Spark DataFrame from the airline
collection, which is in the inventory
scope of the travel-sample
database:
-
Scala
-
PySpark
val airlines = spark.read
.format("couchbase.columnar")
.option(ColumnarOptions.Database, "travel-sample")
.option(ColumnarOptions.Scope, "inventory")
.option(ColumnarOptions.Collection, "airline")
.load
airlines = (spark.read
.format("couchbase.columnar")
.option("database", "travel-sample")
.option("scope", "inventory")
.option("collection", "airline")
.load())
This is a normal Spark DataFrame that we can count, iterate and so on.
-
Scala
-
PySpark
println(airlines.count)
airlines.foreach(row => {
val id = row.getAs[String]("id")
val name = row.getAs[String]("name")
println(s"Row: id=${id} name=${name}")
})
airlines.show()
print(airlines.count())
collected = airlines.collect()
for airline in collected:
print(airline)
id = airline["id"]
name = airline["name"]
print(f"Airline: id={id} name={name}")
Reading a Dataset
(Supported in Scala only, as Apache Spark does not support Datasets via PySpark.)
It can be preferable to read into a Spark Dataset
rather than a DataFrame, as this lets us use Scala case classes directly.
To do this, we:
-
Create an
Airline
case class that matches our expected results. -
Import the
SparkSession
implicits allowing Spark to convert directly to ourAirline
class. -
Do
.as[Airline]
to turn our DataFrame into aDataset
.
case class Airline(id: String, name: String, country: String) // (1)
val sparkSession = spark
import sparkSession.implicits._ // (2)
val airlinesDataset = spark.read
.format("couchbase.columnar")
.option(ColumnarOptions.Database, "travel-sample")
.option(ColumnarOptions.Scope, "inventory")
.option(ColumnarOptions.Collection, "airline")
.load
.as[Airline] // (3)
Spark SQL
We can use Spark’s createOrReplaceTempView
to create a temporary view from a DataFrame, which we can then run Spark SQL on (which creates another DataFrame):
-
Scala
-
PySpark
airlines.createOrReplaceTempView("airlinesView")
val airlinesFromView = spark.sql("SELECT * FROM airlinesView")
airlines.createOrReplaceTempView("airlinesView")
airlinesFromView = spark.sql("SELECT * FROM airlinesView")
print(airlinesFromView.count())
Note this SQL is executed purely within Spark, and is not sent to the Capella Columnar cluster.