Configuration

  • concept
    +
    Information about the different configuration properties the Spark connector either expects or allows to set.

    All couchbase-specific properties start with the spark.couchbase prefix. They can all be configured on the SparkSession like the following:

    val spark = SparkSession
      .builder()
      .config("PROPERTY-KEY", "PROPERTY-VALUE")
      .getOrCreate()

    Required Properties

    Three properties are required to get the connector up and running:

    Table 1. Required Config Properties
    Property Description

    spark.couchbase.connectionString

    The connection string / hostnames of the cluster

    spark.couchbase.username

    The name of your RBAC user

    spark.couchbase.password

    The password of your RBAC user

    For example:

    val spark = SparkSession
      .builder()
      .appName("Migration")
      .config("spark.couchbase.connectionString", "127.0.0.1")
      .config("spark.couchbase.username", "user")
      .config("spark.couchbase.password", "pass")
      .getOrCreate()

    Optional Properties

    There are other properties which can be provided that alter the workflow, for example by providing implicit bucket, scope or collection names.

    Table 2. Optional Config Properties
    Property Description

    spark.couchbase.implicitBucket

    Used as a bucket, if no explicit name provided on the operation.

    spark.couchbase.implicitScope

    Used as a scope, if no explicit name provided on the operation.

    spark.couchbase.implicitCollection

    Used as a collection, if no explicit name provided on the operation.

    spark.couchbase.waitUntilReadyTimeout

    The time until the SDK waits to make sure all connections are properly established (1 minute default).

    The implicit values are always used if no explicit Keyspace or option override is provided on an operation.

    Dynamic Configuration Properties

    In addition to configuring the connector, it is also possible to configure the underlying SDK properties. Usually the SDK provides a rich builder to do so, but since spark only allows to provide string properties the approach is a bit less flexible.

    The strategy is similar to configuring the SDK through system properties or connection string values. Properties are taken as a string value and then decoded into the target format.

    For example to configure the KeyValue timeout, the following property can be used:

    • Key: "spark.couchbase.timeout.kvTimeout"

    • Value: "10s"

    Or to set the certificate path:

    • Key: security.trustCertificate

    • Value: mycert.pem

    Please refer to the SDK documentation for the possible keys or values.

    Connecting to multiple Clusters

    This feature is only available with Spark Connector 3.3.0 and later.

    It is possible to connect to more than one Couchbase Cluster when using the Spark Connector.

    To create more than one connection, a custom identifier prefixed by : needs to be provided at configuration time.The following examples configures a default environment that is always used and a westcoast environment that needs to be explicitly specified when performing an operation:

    val spark = SparkSession
      .builder()
      .appName("MultipleClsuterConnections")
      .config("spark.couchbase.connectionString", "hostname1")
      .config("spark.couchbase.username", "username")
      .config("spark.couchbase.password", "password")
      .config("spark.couchbase.connectionString:westcoast", "westcoasthost")
      .config("spark.couchbase.username:westcoast", "westcoastuser")
      .config("spark.couchbase.password:westcoast", "westcoastpassword")
      .getOrCreate()

    When performing an operation (like querying a DataFrame), an option can be provided to use the custom connection instead of the default one.

    This will use the default connection:

    val airports = spark.read
      .format("couchbase.query")
      .load()

    Setting the QueryOptions.ConnectionIdentifier option will override the default connection based on the identifier provided.

    val airports = spark.read
      .format("couchbase.query")
      .option(QueryOptions.ConnectionIdentifier, "westcoast")
      .load()

    On RDD operations additional arguments are provided to achieve the same:

    spark.sparkContext
      .couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")), connectionIdentifier = "westcoast")
      .collect()

    The same connection identifier mechanism works for dynamic properties (described earlier).

    TLS Connections

    TLS connections can be configured through SDK properties shown above, but there is an alternative way that aligns with configuring TLS in spark itself. The following properties are recognized and if enabled used to connect to a Couchbase cluster encrypted:

    Table 3. TLS Config Properties
    Property Description

    spark.ssl.enabled

    if TLS/SSL should be enabled

    spark.ssl.keyStore

    the path to the jvm keystore

    spark.ssl.keyStorePassword

    the password of the jvm keystore

    spark.ssl.insecure

    Uses the InsecureTrustManager and accepts all certificates. This should only be used during development!

    Note that the prefix for these properties is not spark.couchbase but spark.ssl, since they are spark-generic properties.

    Client Certificates

    Authentication can also be done with client certificates rather than username and password. This needs to be configured on the cluster first, following these instructions.

    Then it can be configured in the connector like this (assuming the certificates have been written into a Keystore named keystore with password storepass):

    SparkSession
        .config("spark.couchbase.connectionString", connectionString)
        .config("spark.couchbase.keyStorePath", "keystore")
        .config("spark.couchbase.keyStorePassword", "storepass")
        .config("spark.couchbase.keyStoreType", "jks")
        .config("spark.couchbase.security.trustCertificate", "ca.pem")
        .config("spark.couchbase.security.enableTls", "true")