Configuration
- concept
Information about the different configuration properties the Spark connector either expects or allows to set.
All couchbase-specific properties start with the spark.couchbase
prefix. They can all be configured on the SparkSession
like the following:
val spark = SparkSession
.builder()
.config("PROPERTY-KEY", "PROPERTY-VALUE")
.getOrCreate()
Required Properties
Three properties are required to get the connector up and running:
Property | Description |
---|---|
spark.couchbase.connectionString |
The connection string / hostnames of the cluster |
spark.couchbase.username |
The name of your RBAC user |
spark.couchbase.password |
The password of your RBAC user |
For example:
val spark = SparkSession
.builder()
.appName("Migration")
.config("spark.couchbase.connectionString", "127.0.0.1")
.config("spark.couchbase.username", "user")
.config("spark.couchbase.password", "pass")
.getOrCreate()
Optional Properties
There are other properties which can be provided that alter the workflow, for example by providing implicit bucket, scope or collection names.
Property | Description |
---|---|
spark.couchbase.implicitBucket |
Used as a bucket, if no explicit name provided on the operation. |
spark.couchbase.implicitScope |
Used as a scope, if no explicit name provided on the operation. |
spark.couchbase.implicitCollection |
Used as a collection, if no explicit name provided on the operation. |
spark.couchbase.waitUntilReadyTimeout |
The time until the SDK waits to make sure all connections are properly established (1 minute default). |
The implicit values are always used if no explicit Keyspace
or option override is provided on an operation.
Dynamic Configuration Properties
In addition to configuring the connector, it is also possible to configure the underlying SDK properties. Usually the SDK provides a rich builder to do so, but since spark only allows to provide string properties the approach is a bit less flexible.
The strategy is similar to configuring the SDK through system properties or connection string values. Properties are taken as a string value and then decoded into the target format.
For example to configure the KeyValue timeout, the following property can be used:
-
Key:
"spark.couchbase.timeout.kvTimeout"
-
Value:
"10s"
Or to set the certificate path:
-
Key:
security.trustCertificate
-
Value:
mycert.pem
Please refer to the SDK documentation for the possible keys or values.
Connecting to multiple Clusters
This feature is only available with Spark Connector 3.3.0 and later. |
It is possible to connect to more than one Couchbase Cluster when using the Spark Connector.
To create more than one connection, a custom identifier prefixed by :
needs to be provided at configuration time.The following examples configures a default environment that is always used and a westcoast
environment that needs to be explicitly specified when performing an operation:
val spark = SparkSession
.builder()
.appName("MultipleClsuterConnections")
.config("spark.couchbase.connectionString", "hostname1")
.config("spark.couchbase.username", "username")
.config("spark.couchbase.password", "password")
.config("spark.couchbase.connectionString:westcoast", "westcoasthost")
.config("spark.couchbase.username:westcoast", "westcoastuser")
.config("spark.couchbase.password:westcoast", "westcoastpassword")
.getOrCreate()
When performing an operation (like querying a DataFrame), an option can be provided to use the custom connection instead of the default one.
This will use the default connection:
val airports = spark.read
.format("couchbase.query")
.load()
Setting the QueryOptions.ConnectionIdentifier
option will override the default connection based on the identifier provided.
val airports = spark.read
.format("couchbase.query")
.option(QueryOptions.ConnectionIdentifier, "westcoast")
.load()
On RDD operations additional arguments are provided to achieve the same:
spark.sparkContext
.couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")), connectionIdentifier = "westcoast")
.collect()
The same connection identifier mechanism works for dynamic properties (described earlier).
TLS Connections
TLS connections can be configured through SDK properties shown above, but there is an alternative way that aligns with configuring TLS in spark itself. The following properties are recognized and if enabled used to connect to a Couchbase cluster encrypted:
Property | Description |
---|---|
spark.ssl.enabled |
if TLS/SSL should be enabled |
spark.ssl.keyStore |
the path to the jvm keystore |
spark.ssl.keyStorePassword |
the password of the jvm keystore |
spark.ssl.insecure |
Uses the InsecureTrustManager and accepts all certificates. This should only be used during development! |
Note that the prefix for these properties is not spark.couchbase
but spark.ssl
, since they are spark-generic properties.
Client Certificates
Authentication can also be done with client certificates rather than username and password. This needs to be configured on the cluster first, following these instructions.
Then it can be configured in the connector like this (assuming the certificates have been written into a Keystore named keystore
with password storepass
):
SparkSession
.config("spark.couchbase.connectionString", connectionString)
.config("spark.couchbase.keyStorePath", "keystore")
.config("spark.couchbase.keyStorePassword", "storepass")
.config("spark.couchbase.keyStoreType", "jks")
.config("spark.couchbase.security.trustCertificate", "ca.pem")
.config("spark.couchbase.security.enableTls", "true")