Capella Columnar Support

    • Developer Preview
      +
      Connecting to Capella Columnar is very similar to connecting to any Couchbase cluster. This section explains how.

      Capella Columnar clusters can be connected to using either Scala or PySpark. To get bootstrapped, users should view the Scala getting started or PySpark getting started guides.

      Developer Preview mode ("Preview Mode"), provides early access to features which may become generally available ("GA") in future releases and enables you to play with these features to get a sense of how they work. Preview Mode features and their use are subject to Couchbase’s "Non-GA Offering Supplemental Terms", set forth in the License Agreement. Preview Mode features may not be functionally complete and are not intended for production use. They are intended for development and testing purposes only.

      Spark Configuration

      The first step as usual is to create a SparkSession, here connecting to your Capella Columnar cluster. This works just like connecting to any other type of Couchbase cluster.

      Scala

      Change the cluster configuration options to match your own.

      val spark = SparkSession
        .builder()
        .appName("Couchbase Spark Connector Columnar Example") // your app name
        .master("local[*]") // your local or remote Spark master node
        .config("spark.couchbase.connectionString", "couchbases://your-columnar-endpoint.cloud.couchbase.com")
        .config("spark.couchbase.username", "username")
        .config("spark.couchbase.password", "password")
        .getOrCreate()
      PySpark

      Change the cluster configuration options to match your own, and provide the location of the Spark Connector library if needed (see the PySpark getting started guide for more details).

      spark = (SparkSession.builder
          .appName("Couchbase Spark Connector Columnar Example")
          # Note whether you need the .master(...) and .config("spark.jars"...) lines depends on how you are using Spark.
          # See our PySpark documentation for more details.
          .master("local[*]")
          .config("spark.jars", "/path/to/spark-connector-assembly-<version>.jar")
          .config("spark.couchbase.connectionString", "couchbases://cb.your.columnar.connection.string.com")
          .config("spark.couchbase.username", "YourColumnarUsername")
          .config("spark.couchbase.password", "YourColumnarPassword")
          .getOrCreate())

      The following examples will use the travel-sample example set of data, which can be loaded through the UI.

      Reading a Dataframe

      Let’s start by reading a Spark DataFrame from the airline collection, which is in the inventory scope of the travel-sample database:

      • Scala

      • PySpark

      val airlines = spark.read
        .format("couchbase.columnar")
        .option(ColumnarOptions.Database, "travel-sample")
        .option(ColumnarOptions.Scope, "inventory")
        .option(ColumnarOptions.Collection, "airline")
        .load
      airlines = (spark.read
          .format("couchbase.columnar")
          .option("database", "travel-sample")
          .option("scope", "inventory")
          .option("collection", "airline")
          .load())

      This is a normal Spark DataFrame that we can count, iterate and so on.

      • Scala

      • PySpark

      println(airlines.count)
      
      airlines.foreach(row => {
        val id = row.getAs[String]("id")
        val name = row.getAs[String]("name")
        println(s"Row: id=${id} name=${name}")
      })
      airlines.show()
      print(airlines.count())
      
      collected = airlines.collect()
      
      for airline in collected:
          print(airline)
          id = airline["id"]
          name = airline["name"]
          print(f"Airline: id={id} name={name}")

      Reading a Dataset

      (Supported in Scala only, as Apache Spark does not support Datasets via PySpark.)

      It can be preferable to read into a Spark Dataset rather than a DataFrame, as this lets us use Scala case classes directly.

      To do this, we:

      1. Create an Airline case class that matches our expected results.

      2. Import the SparkSession implicits allowing Spark to convert directly to our Airline class.

      3. Do .as[Airline] to turn our DataFrame into a Dataset.

      case class Airline(id: String, name: String, country: String) // (1)
      
      val sparkSession = spark
      import sparkSession.implicits._ // (2)
      
      val airlinesDataset = spark.read
        .format("couchbase.columnar")
        .option(ColumnarOptions.Database, "travel-sample")
        .option(ColumnarOptions.Scope, "inventory")
        .option(ColumnarOptions.Collection, "airline")
        .load
        .as[Airline] // (3)

      Spark SQL

      We can use Spark’s createOrReplaceTempView to create a temporary view from a DataFrame, which we can then run Spark SQL on (which creates another DataFrame):

      • Scala

      • PySpark

      airlines.createOrReplaceTempView("airlinesView")
      val airlinesFromView = spark.sql("SELECT * FROM airlinesView")
      airlines.createOrReplaceTempView("airlinesView")
      airlinesFromView = spark.sql("SELECT * FROM airlinesView")
      print(airlinesFromView.count())

      Note this SQL is executed purely within Spark, and is not sent to the Capella Columnar cluster.