#
# Copyright 2017, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import weakref
from couchbase.admin import Admin
from couchbase.bucket import Bucket
from couchbase.connstr import ConnectionString
from couchbase.exceptions import CouchbaseError
import itertools
from collections import defaultdict
import warnings
class MixedAuthError(CouchbaseError):
"""
Cannot use old and new style auth together in the same cluster
"""
pass
class NoBucketError(CouchbaseError):
"""
Operation requires at least a single bucket to be open
"""
class OverrideSet(set):
def __init__(self, *args, **kwargs):
super(OverrideSet, self).__init__(*args, **kwargs)
class OverrideDict(dict):
def __init__(self, *args, **kwargs):
super(OverrideDict, self).__init__(*args, **kwargs)
[docs]class Cluster(object):
# list of all authentication types, keep up to date, used to identify connstr/kwargs auth styles
def __init__(self, connection_string='couchbase://localhost',
bucket_class=Bucket):
"""
Creates a new Cluster object
:param connection_string: Base connection string. It is an error to
specify a bucket in the string.
:param bucket_class: :class:`couchbase.bucket.Bucket` implementation to
use.
"""
self.connstr = ConnectionString.parse(str(connection_string))
self.bucket_class = bucket_class
self.authenticator = None
self._buckets = {}
if self.connstr.bucket:
raise ValueError('Cannot pass bucket to connection string: ' + self.connstr.bucket)
if 'username' in self.connstr.options:
raise ValueError('username must be specified in the authenticator, '
'not the connection string')
[docs] def authenticate(self, authenticator=None, username=None, password=None):
"""
Set the type of authenticator to use when opening buckets or performing
cluster management operations
:param authenticator: The new authenticator to use
:param username: The username to authenticate with
:param password: The password to authenticate with
"""
if authenticator is None:
if not username:
raise ValueError('username must not be empty.')
if not password:
raise ValueError('password must not be empty.')
authenticator = PasswordAuthenticator(username, password)
self.authenticator = authenticator
[docs] def open_bucket(self, bucket_name, **kwargs):
# type: (str, str) -> Bucket
"""
Open a new connection to a Couchbase bucket
:param bucket_name: The name of the bucket to open
:param kwargs: Additional arguments to provide to the constructor
:return: An instance of the `bucket_class` object provided to
:meth:`__init__`
"""
if self.authenticator:
auth_credentials_full = self.authenticator.get_auto_credentials(bucket_name)
else:
auth_credentials_full = {'options': {}}
auth_credentials = auth_credentials_full['options']
connstr = ConnectionString.parse(str(self.connstr))
connstr.bucket = bucket_name
for attrib in set(auth_credentials) - {'password'}:
connstr.set_option(attrib, auth_credentials[attrib])
# Check if there are conflicting authentication types in any of the parameters
# Also sets its own 'auth_type' field to the type of authentication it
# thinks is being specified
normalizer = Cluster.ParamNormaliser(self.authenticator, connstr, **kwargs)
# we don't do anything with this information unless the Normaliser thinks
# Cert Auth is involved as this is outside the remit of PYCBC-487/488/489
if issubclass(normalizer.auth_type, CertAuthenticator) or issubclass(type(self.authenticator),
CertAuthenticator):
# TODO: check_clash_free_params/check_no_unwanted_keys including connstr options
# should probably apply to PasswordAuthenticator as well,
# but outside remit of PYCBC-487
# TODO: do we accept clashing/unwanted options in connstr/kwargs e.g. password?
# Here, we throw an exception
# in the case of any clash/unwanted options, but we could scrub
# clashing/unwanted options from connstr/kwargs
normalizer.check_no_unwanted_keys()
normalizer.check_clash_free_params()
normalizer.assert_no_critical_complaints()
if 'password' in kwargs:
if isinstance(self.authenticator, PasswordAuthenticator):
raise MixedAuthError("Cannot override "
"PasswordAuthenticators password")
else:
kwargs['password'] = auth_credentials.get('password', None)
connstr.scheme = auth_credentials_full.get('scheme', connstr.scheme)
rv = self.bucket_class(str(connstr), **kwargs)
self._buckets[bucket_name] = weakref.ref(rv)
if isinstance(self.authenticator, ClassicAuthenticator):
for bucket, passwd in self.authenticator.buckets.items():
if passwd:
rv.add_bucket_creds(bucket, passwd)
return rv
class ParamNormaliser:
_authentication_types = None
_auth_unique_params = None
@staticmethod
def auth_types():
# cache this calculation
if not Cluster.ParamNormaliser._authentication_types:
Cluster.ParamNormaliser._authentication_types = {CertAuthenticator, ClassicAuthenticator,
PasswordAuthenticator}
Cluster.ParamNormaliser._auth_unique_params = {k.__name__: k.unique_keys() for k in
Cluster.ParamNormaliser._authentication_types}
@property
def authentication_types(self):
Cluster.ParamNormaliser.auth_types()
return Cluster.ParamNormaliser._authentication_types
@property
def auth_unique_params(self):
Cluster.ParamNormaliser.auth_types()
return Cluster.ParamNormaliser._auth_unique_params
def __init__(self, authenticator, connstr, **kwargs):
# build a dictionary of all potentially overlapping/conflicting parameter names
self.param_keys = dict(kwargs=OverrideSet(kwargs), connstr=set(connstr.options))
self.param_keys.update({'auth_credential': authenticator.unique_keys()} if authenticator else {})
self.param_keys.update(self.auth_unique_params)
self.authenticator = authenticator
# compare each parameter set with one another to look for overlaps
self.critical_complaints = []
self._build_clash_dict()
self.auth_type = type(self.authenticator)
self._check_for_auth_type_clashes()
def assert_no_critical_complaints(self):
if self.critical_complaints:
raise MixedAuthError(str(self.critical_complaints))
def check_no_unwanted_keys(self):
"""
Check for definitely unwanted keys in any of the options
for the active authentication type in use and
throw a MixedAuthError if found.
"""
unwanted_keys = self.auth_type.unwanted_keys() if self.auth_type else set()
clash_list = ((k, self._entry(unwanted_keys, k)) for k in set(self.param_keys) - {'auth-credential'})
self._handle_clashes({k: v for k, v in clash_list if v})
def check_clash_free_params(self):
"""
Check for clashes with the authenticator in use, and thrown a MixedAuthError if found.
"""
auth_clashes = self.clash_dict.get('auth_credential')
if auth_clashes:
actual_clashes = {k: v for k, v in auth_clashes.items() if self.auth_type.__name__ != k}
if actual_clashes:
complaint = ', and'.join(
"param set {}: [{}] ".format(second_set, clashes)
for second_set, clashes in auth_clashes.items())
self.critical_complaints.append(
"{} param set: {} clashes with {}".format(self.auth_type.__name__,
self.param_keys['auth_credential'], complaint))
def _build_clash_dict(self):
self.clash_dict = defaultdict(defaultdict)
# build a dictionary {'first_set_name':{'second_set_name':{'key1','key2'}}} listing all overlaps
for item in itertools.combinations(self.param_keys.items(), 2):
clashes = item[0][1] & item[1][1]
if clashes:
warnings.warn("{} and {} options overlap on keys {}".format(item[0][0], item[1][0], clashes))
# make dictionary bidirectional, so we can list all clashes for a given param set directly
self.clash_dict[item[0][0]][item[1][0]] = clashes
self.clash_dict[item[1][0]][item[0][0]] = clashes
def _check_for_auth_type_clashes(self):
connstr_auth_type_must_be = self._get_types_with_unique_parameters()
# are there multiple types this should definitely be, or have we
# is an Authenticator already set which clashes with the detected
# Authenticator types from the parameters?
if len(connstr_auth_type_must_be) > 1 or (
self.authenticator and connstr_auth_type_must_be - {self.auth_type}):
self._build_auth_type_complaints(connstr_auth_type_must_be)
if connstr_auth_type_must_be:
self.auth_type, = connstr_auth_type_must_be
def _get_types_with_unique_parameters(self):
"""
:return: Set of Authenticator types which the params potentially could
represent
"""
connstr_auth_type_must_be = set()
for auth_type in self.authentication_types - {self.auth_type}:
auth_clashes = self.clash_dict.get(auth_type.__name__)
if auth_clashes:
connstr_auth_type_must_be.add(auth_type)
return connstr_auth_type_must_be
def _build_auth_type_complaints(self, connstr_auth_type_must_be):
complaints = []
for auth_type in connstr_auth_type_must_be:
complaints.append("parameters {params} overlap on {auth_type}".format(auth_type=auth_type.__name__,
params=self.clash_dict.get(
auth_type.__name__)))
self.critical_complaints.append("clashing params: {}{}".format(
"got authenticator type {} but ".format(
type(self.authenticator).__name__) if self.authenticator else "",
", and".join(complaints)))
def _entry(self, unwanted_keys, key):
return unwanted_keys & self.param_keys[key]
def _handle_clashes(self, clashes):
if len(clashes):
clash_dict = defaultdict(lambda: defaultdict(list))
for clash, intersection in clashes.items():
clash_dict[isinstance(self.param_keys[clash], OverrideSet)][clash].append(intersection)
action = {False: self._warning, True: self._exception}
for is_override, param_dict in clash_dict.items():
action[is_override](param_dict, self.auth_type)
@staticmethod
def _gen_complaint(param_dict):
complaint = ", and".join(("{} contains {}".format(k, list(*entry)) for k, entry in param_dict.items()))
return complaint
def _get_generic_complaint(self, clash_param_dict, auth_type):
return "Invalid parameters used with {}-style authentication - {}".format(auth_type.__name__,
Cluster.ParamNormaliser._gen_complaint(
clash_param_dict))
def _exception(self, clash_param_dict, auth_type):
self.critical_complaints.append(self._get_generic_complaint(clash_param_dict, auth_type))
def _warning(self, clash_param_dict, auth_type):
warnings.warn(self._get_generic_complaint(clash_param_dict, auth_type))
[docs] def cluster_manager(self):
"""
Returns an object which may be
used to create and manage buckets in the cluster.
:returns: the cluster manager
:rtype: couchbase.admin.Admin
"""
credentials = self.authenticator.get_credentials()['options']
connection_string = str(self.connstr)
return Admin(credentials.get('username'), credentials.get('password'), connection_string=connection_string)
[docs] def n1ql_query(self, query, *args, **kwargs):
"""
Issue a "cluster-level" query. This requires that at least one
connection to a bucket is active.
:param query: The query string or object
:param args: Additional arguments to :cb_bmeth:`n1ql_query`
.. seealso:: :cb_bmeth:`n1ql_query`
"""
from couchbase.n1ql import N1QLQuery
if not isinstance(query, N1QLQuery):
query = N1QLQuery(query)
query.cross_bucket = True
to_purge = []
for k, v in self._buckets.items():
bucket = v()
if bucket:
return bucket.n1ql_query(query, *args, **kwargs)
else:
to_purge.append(k)
for k in to_purge:
del self._buckets[k]
raise NoBucketError('Must have at least one active bucket for query')
def _recursive_creds_merge(base, overlay):
for k, v in overlay.items():
base_k = base.get(k, None)
if not base_k:
base[k] = v
continue
if isinstance(v, dict):
if isinstance(base_k, dict):
base[k] = _recursive_creds_merge(base_k, v)
else:
raise Exception("Cannot merge dict and {}".format(v))
else:
raise Exception("Cannot merge non dicts")
return base
class Authenticator(object):
def __init__(self, cert_path = None):
"""
:param cert_path: Path for SSL certificate (last in chain if multiple)
"""
self._cert_path = cert_path
def get_credentials(self, bucket=None):
"""
Gets the credentials for a specified bucket. If bucket is
`None`, gets the username and password for the entire cluster, if
different.
:param bucket: The bucket to act as context
:return: A dictionary of (optional) scheme and credentials e.g. `{'scheme':'couchbases',options:{'username':'fred', 'password':'opensesame'}}`
"""
return self.get_auto_credentials(bucket)
@classmethod
def unwanted_keys(cls):
"""
The set of option keys that are definitely incompatible with this authentication style.
"""
return set()
@classmethod
def unique_keys(cls):
"""
The set of option keys, if any, that this authenticator uniquely possesses.
"""
return set(cls.get_unique_creds_dict().keys())
@classmethod
def get_unique_creds_dict(cls):
"""
User overridable
A dictionary of authenticator-unique options and functions/lambdas of the form:
function(self):
return self.password
e.g.
{'certpath': lambda self: self.certpath}
"""
return {}
def _base_options(self, bucket, overlay):
base_dict = {'options': {'cert_path': self._cert_path} if self._cert_path else {}}
return _recursive_creds_merge(base_dict, overlay)
def get_cred_bucket(self, bucket, **overlay):
"""
:param bucket:
:return: returns the non-unique parts of the credentials for bucket authentication,
as a dictionary of functions, e.g.:
'options': {'username': self.username}, 'scheme': 'couchbases'}
"""
return self._base_options(bucket, overlay)
def get_cred_not_bucket(self, **overlay):
"""
:param bucket:
:return: returns the non-unique parts of the credentials for admin access
as a dictionary of functions, e.g.:
{'options':{'password': self.password}}
"""
return self._base_options(None, overlay)
def get_auto_credentials(self, bucket):
"""
:param bucket:
:return: returns a dictionary of credentials for bucket/admin
authentication
"""
result = {k: v(self) for k, v in self.get_unique_creds_dict().items()}
if bucket:
result.update(self.get_cred_bucket(bucket))
else:
result.update(self.get_cred_not_bucket())
return result
class PasswordAuthenticator(Authenticator):
def __init__(self, username, password, cert_path=None):
"""
This class uses a single credential pair of username and password, and
is designed to be used either with cluster management operations or
with Couchbase 5.0 style usernames with role based access control.
For older cluster versions, or if you are only using a bucket's "SASL"
password, use :class:`~.ClassicAuthenticator`
:param username:
:param password:
:param cert_path:
Path of the CA key
.. warning:: This functionality is experimental both in API and
implementation.
"""
super(PasswordAuthenticator,self).__init__(cert_path=cert_path)
self.username = username
self.password = password
def get_cred_bucket(self, bucket, **overlay):
return self.get_cred_not_bucket(**overlay)
def get_cred_not_bucket(self, **overlay):
merged = _recursive_creds_merge({'options': {'username': self.username, 'password': self.password}}, overlay)
return super(PasswordAuthenticator, self).get_cred_not_bucket(**merged)
@classmethod
def unwanted_keys(cls):
return {'password'}
class ClassicAuthenticator(Authenticator):
def __init__(self, cluster_username=None,
cluster_password=None,
buckets=None,
cert_path=None):
"""
Classic authentication mechanism.
:param cluster_username:
Global cluster username. Only required for management operations
:param cluster_password:
Global cluster password. Only required for management operations
:param buckets:
A dictionary of `{bucket_name: bucket_password}`.
:param cert_path:
Path of the CA key
"""
super(ClassicAuthenticator, self).__init__(cert_path=cert_path)
self.username = cluster_username
self.password = cluster_password
self.buckets = buckets if buckets else {}
def get_cred_not_bucket(self):
return super(ClassicAuthenticator, self).get_cred_not_bucket(**{'options': {'username': self.username, 'password': self.password}})
def get_cred_bucket(self, bucket, **overlay):
merged=_recursive_creds_merge({'options': {'password': self.buckets.get(bucket)}}, overlay)
return super(ClassicAuthenticator, self).get_cred_bucket(bucket, **merged)
class CertAuthenticator(Authenticator):
def __init__(self, cert_path=None, key_path=None, trust_store_path=None, cluster_username=None,
cluster_password=None):
"""
Certificate authentication mechanism.
:param cluster_username:
Global cluster username. Only required for management operations
:param cluster_password:
Global cluster password. Only required for management operations
:param cert_path:
Path of the CA key
:param key_path:
Path of the key
:param trust_store_path:
Path of the certificate trust store.
"""
super(CertAuthenticator, self).__init__(cert_path=cert_path)
self.username = cluster_username
self.password = cluster_password
self.keypath = key_path
self.trust_store_path = trust_store_path
@classmethod
def get_unique_creds_dict(clazz):
return { 'keypath': lambda self: self.keypath,
'truststorepath': lambda self: self.trust_store_path}
def get_cred_bucket(self, bucket, **overlay):
merged = _recursive_creds_merge(
{'options': {'username': self.username}, 'scheme': 'couchbases'},
overlay)
return super(CertAuthenticator, self).get_cred_bucket(bucket, **merged)
def get_cred_not_bucket(self):
return super(CertAuthenticator, self).get_cred_not_bucket(**{'options': {'password': self.password}})
@classmethod
def unwanted_keys(cls):
return {'password'}
def get_credentials(self, bucket=None):
return self.get_auto_credentials(bucket)