Source code for couchbase.cluster

import asyncio

from couchbase_core.asynchronous.client import AsyncClientMixin
from couchbase.mutation_state import MutationState
from couchbase.management.queries import QueryIndexManager
from couchbase.management.search import SearchIndexManager
from couchbase.management.analytics import AnalyticsIndexManager
from couchbase.analytics import AnalyticsOptions
from .auth import NoBucketException, Authenticator
from .management.users import UserManager
from .management.buckets import BucketManager
from couchbase.management.admin import Admin
from couchbase.diagnostics import DiagnosticsResult
from couchbase.search import SearchResult, SearchOptions, SearchQuery
from .analytics import AnalyticsResult
from .n1ql import QueryResult
from couchbase_core.n1ql import _N1QLQuery
from .options import OptionBlock, OptionBlockDeriv
from .bucket import Bucket, CoreClient, PingOptions
from couchbase_core.cluster import _Cluster as CoreCluster
from .exceptions import AlreadyShutdownException, InvalidArgumentException, \
    SearchException, QueryException, AnalyticsException, CouchbaseException, NetworkException
import couchbase_core._libcouchbase as _LCB
from couchbase_core._pyport import raise_from
from couchbase.options import OptionBlockTimeOut, LockMode
from couchbase_core.cluster import *
from .result import *
from random import choice
from enum import Enum
from copy import deepcopy
from datetime import timedelta


T = TypeVar('T')


CallableOnOptionBlock = Callable[[OptionBlockDeriv, Any], Any]


class DiagnosticsOptions(OptionBlock):

    @overload
    def __init__(self,
                 report_id=None # type: str
                 ):
        pass

    def __init__(self,
                 **kwargs
                 ):
        """

        :param str report_id: An id that is appended on to the :class:`~.DiagnosticsResult`.  Helps with
            disambiguating reports when you have several running.
        """
        super(DiagnosticsOptions, self).__init__(**kwargs)


[docs]class QueryScanConsistency(Enum): """ QueryScanConsistency This can be: NOT_BOUNDED Which means we just return what is currently in the indexes, or REQUEST_PLUS which means we 'read our own writes'. Slower, since the query has to wait for the indexes to catch up. """ REQUEST_PLUS = "request_plus" NOT_BOUNDED = "not_bounded"
class QueryProfile(Enum): """ QueryProfile You can chose to set this to: OFF No query profiling data will be collected. PHASES Profile will have details on phases. TIMINGS Profile will have phases, and details on the query plan execution as well. """ OFF = 'off' PHASES = 'phases' TIMINGS = 'timings'
[docs]class QueryOptions(OptionBlockTimeOut): VALID_OPTS = {'timeout', 'read_only', 'scan_consistency', 'adhoc', 'client_context_id', 'consistent_with', 'max_parallelism', 'positional_parameters', 'named_parameters', 'pipeline_batch', 'pipeline_cap', 'profile', 'raw', 'scan_wait', 'scan_cap', 'metrics'} @overload def __init__(self, timeout=None, # type: timedelta read_only=None, # type: bool scan_consistency=None, # type: QueryScanConsistency adhoc=None, # type: bool client_context_id=None, # type: str consistent_with=None, # type: MutationState max_parallelism=None, # type: int positional_parameters=None, # type: Iterable[JSON] named_parameters=None, # type: dict[str, JSON] pipeline_batch=None, # type: int pipeline_cap=None, # type: int profile=None, # type: QueryProfile raw=None, # type: dict[str, JSON] scan_wait=None, # type: timedelta scan_cap=None, # type: int metrics=False # type: bool ): pass
[docs] def __init__(self, **kwargs ): """ QueryOptions Various options for queries :param timedelta timeout: Uses this timeout value, rather than the default for the cluster. See :meth:`~Cluster.query_timeout`. :param bool read_only: Hint to the server that this is a read-only query. :param QueryScanConsistency scan_consistency: Specify the level of consistency for the query. Overrides any setting in consistent_with. Can be either :meth:`~.QueryScanConsistency.NOT_BOUNDED`, which means return what is in the index now, or :meth:`~.QueryScanConsistency.REQUEST_PLUS`, which means you can read your own writes. Slower, but when you need it you have it. :param bool adhoc: Specifies if the prepared statement logic should be executed internally. :param str client_context_id: Specifies a context ID string which is mirrored back from the query engine on response. :param MutationState consistent_with: Specifies custom scan consistency through “at_plus” with mutation state token vectors. :param int max_parallelism: The maximum number of logical cores to use in parallel for this query. :param Iterable[JSON] positional_parameters: Specifies the parameters used in the query, when positional notation ($1, $2, etc...) is used. :param dict[str,JSON] named_parameters: Specifies the parameters used in the query, when named parameter notation ($foo, $bar, etc...) is used. :param int pipeline_batch: Specifies pipeline batching characteristics. :param int pipeline_cap: Specifies pipeline cap characteristics. :param QueryProfile profile: Specifies the profiling level to use. :param dict[str,JSON] raw: This is a way to to specify the query payload to support unknown commands and be future-compatible. :param timedelta scan_wait: Specifies maximum amount of time to wait for a scan. :param int scan_cap: Specifies the scan cap characteristics. :param bool metrics: Specifies whether or not to include metrics with the :class:`~.QueryResult`. """ super(QueryOptions, self).__init__(**kwargs)
def to_n1ql_query(self, statement, *options, **kwargs): # lets make a copy of the options, and update with kwargs... args = self.copy() args.update(kwargs) # now lets get positional parameters. Actual positional # params OVERRIDE positional_parameters positional_parameters = args.pop('positional_parameters', []) if options and len(options) > 0: positional_parameters = options # now the named parameters. NOTE: all the kwargs that are # not VALID_OPTS must be named parameters, and the kwargs # OVERRIDE the list of named_parameters new_keys = list(filter(lambda x: x not in self.VALID_OPTS, args.keys())) named_parameters = args.pop('named_parameters', {}) for k in new_keys: named_parameters[k] = args[k] query = _N1QLQuery(statement, *positional_parameters, **named_parameters) # now lets try to setup the options. TODO: rework this after beta.3 # but for now we will use the existing _N1QLQuery. Could be we can # add to it, etc... # default to false on metrics query.metrics = args.get('metrics', False) # TODO: there is surely a cleaner way... for k, v in ((k, args[k]) for k in (args.keys() & self.VALID_OPTS)): if k == 'scan_consistency': query.consistency = v.value if k == 'consistent_with': query.consistent_with = v if k == 'adhoc': query.adhoc = v if k == 'timeout': query.timeout = v.total_seconds() if k == 'scan_cap': query.scan_cap = v if k == 'pipeline_batch': query.pipeline_batch = v if k == 'pipeline_cap': query.pipeline_cap = v if k == 'read_only': query.readonly = v if k == 'profile': query.profile = v.value return query # this will change the options for export. # NOT USED CURRENTLY def as_dict(self): for key, val in self.items(): if key == 'positional_parameters': self.pop(key, None) self['args'] = val if key == 'named_parameters': self.pop(key, None) for k, v in val.items(): self["${}".format(k)] = v if key == 'scan_consistency': self[key] = val.as_string() if key == 'consistent_with': self[key] = val.encode() if key == 'profile': self[key] = val.as_string() if key == 'scan_wait': # scan_wait should be in ms self[key] = val.total_seconds() * 1000 if self.get('consistent_with', None): self['scan_consistency'] = 'at_plus' return self
class ClusterTimeoutOptions(dict): KEY_MAP = {'kv_timeout': 'operation_timeout', 'query_timeout': 'query_timeout', 'views_timeout': 'views_timeout', 'config_total_timeout': 'config_total_timeout'} @overload def __init__(self, query_timeout=None, # type: timedelta kv_timeout=None, # type: timedelta views_timeout=None, # type: timedelta config_total_timeout=None # type: timedelta ): pass def __init__(self, **kwargs): """ ClusterTimeoutOptions These will be the default timeouts for queries, kv operations or views for the entire cluster :param timedelta query_timeout: Timeout for query operations. :param timedelta kv_timeout: Timeout for KV operations. :param timedelta views_timeout: Timeout for View operations. :param timedelta config_total_timeout: Timeout for complete bootstrap configuration """ super().__init__(**kwargs) def as_dict(self): opts = {} for k, v in self.items(): if v is None or k not in self.KEY_MAP.keys(): continue elif k in self.KEY_MAP: opts[self.KEY_MAP[k]] = v.total_seconds() else: opts[k] = v return opts class Compression(Enum): """ Can be one of: NONE: The client will not compress or decompress the data. IN: The data coming back from the server will be decompressed, if it was compressed. OUT: The data coming into server will be compressed. INOUT: The data will be compressed on way in, decompressed on way out of server. FORCE: By default the library will send a HELLO command to the server to determine whether compression is supported or not. Because commands may be pipelined prior to the scheduing of the HELLO command it is possible that the first few commands may not be compressed when schedule due to the library not yet having negotiated settings with the server. Setting this flag will force the client to assume that all servers support compression despite a HELLO not having been intially negotiated. """ @classmethod def from_int(cls, val): if val == 0: return cls.NONE elif val == 1: return cls.IN elif val == 2: return cls.OUT elif val == 3: return cls.INOUT elif val == 7: # note that the lcb flag is a 4, but when you set "force" in the connection # string, it sets it as INOUT|FORCE. return cls.FORCE else: raise InvalidArgumentException("cannot convert {} to a Compression".format(val)) NONE='off' IN='inflate_only' OUT='deflate_only' INOUT='on' FORCE='force' class ClusterTracingOptions(dict): INT_KEYS = ['tracing_threshold_queue_size', 'tracing_orphaned_queue_size'] KEYS = ['tracing_threshold_kv', 'tracing_threshold_view', 'tracing_threshold_query', 'tracing_threshold_search', 'tracing_threshold_analytics', 'tracing_threshold_queue_size', 'tracing_threshold_queue_flush_interval', 'tracing_orphaned_queue_size', 'tracing_orphaned_queue_flush_interval'] @overload def __init__(self, tracing_threshold_kv=None, # type: timedelta tracing_threshold_view=None, # type: timedelta tracing_threshold_query=None, # type: timedelta tracing_threshold_search=None, # type: timedelta tracing_threshold_analytics=None, # type: timedelta tracing_threshold_queue_size=None, # type: int tracing_threshold_queue_flush_interval=None, # type: timedelta tracing_orphaned_queue_size=None, # type: int tracing_orphaned_queue_flush_interval=None, # type: timedelta ): pass def __init__(self, **kwargs): """ ClusterTracingOptions These parameters control when/how our request tracing will log slow requests or orphaned responses. :param timedelta tracing_threshold_kv: Any KV request taking longer than this will be traced. :param timedelta tracing_threshold_view: Any View request taking longer than this will be traced. :param timedelta tracing_threshold_query: Any Query request taking longer than this will be traced. :param timedelta tracing_threshold_search: Any Search request taking longer than this will be traced. :param timedelta tracing_threshold_analytics: Any Analytics request taking longer than this will be traced :param int tracing_threshold_queue_size: Limits the number of requests traced. :param timedelta tracing_threshold_queue_flush_interval: Interval between flushes of the threshold queues. :param int tracing_orphaned_queue_size: Limits the number of orphaned requests traced. :param timedelta tracing_orphaned_queue_flush_interval: Interval between flushes of the orphaned queue. """ super().__init__(**kwargs) def as_dict(self): opts = {} for k, v in self.items(): if v is None or k not in self.KEYS: continue elif isinstance(v, timedelta): opts[k] = v.total_seconds() else: opts[k] = v return opts
[docs]class ClusterOptions(dict): KEYS = ['timeout_options', 'tracing_options', 'log_redaction', 'compression', 'compression_min_size', 'compression_min_ratio'] @overload def __init__(self, authenticator, # type: Authenticator timeout_options=None, # type: ClusterTimeoutOptions tracing_options=None, # type: ClusterTracingOptions log_redaction=None, # type: bool compression=None, # type: Compression compression_min_size=None, # type: int compression_min_ratio=None, # type: float lockmode=None # type: LockMode ): pass
[docs] def __init__(self, authenticator, # type: Authenticator **kwargs): """ Options to set when creating a cluster. Note the authenticator is mandatory, all the others are optional. :param Authenticator authenticator: :class:`~.Authenticator` to use - see :class:`~.PasswordAuthenticator` and :class:`~.CertAuthenticator`. :param ClusterTimeoutOptions timeout_options: A :class:`~.ClusterTimeoutOptions` object, with various optional timeouts. :param ClusterTracingOptions tracing_options: A :class:`~.ClusterTracingOptions` object, with various options for tracing. :param bool log_redaction: Turn log redaction on/off. :param Compression compression: A :class:`~.Compression` value for this cluster. :param int compression_min_size: Min size of the data before compression kicks in. :param float compression_min_ratio: A `float` representing the minimum compression ratio to use when compressing. """ super(ClusterOptions, self).__init__(**kwargs) self['authenticator'] = authenticator
def update_connection_string(self, connstr, **kwargs): opts = self.as_dict(**kwargs) if len(opts) == 0: return connstr conn = ConnectionString.parse(connstr) for k, v in opts.items(): conn.set_option(k, v) return conn.encode() def split_args(self, **kwargs): # return a tuple with args we recognize and those we don't, which # should be kwargs in connect ours = {} theirs = {} for k, v in kwargs.items(): if k in self.KEYS: ours[k] = v else: theirs[k] = v return (ours, theirs,) def as_dict(self, **kwargs): # the kwargs override or add to existing args. So you could do something like: # opts.as_dict(tracing_options=TracingOptions(tracing_threshold_kv=timedelta(seconds=1)), # compression=Compression.NONE) # and expect those values to override the corresponding ones in the output. # # first, get the nested dicts and update if necessary for k in ['timeout_options', 'tracing_options']: obj = self.get(k, {}).update(kwargs.pop(k, {})) if obj: self.update({k:obj}) # now, update the top-level ones self.update(kwargs) # now convert final product opts = {} for k, v in self.items(): if v is None or k not in self.KEYS: continue elif k in ['timeout_options', 'tracing_options']: opts.update(v.as_dict()) elif k in ['compression_min_size', 'log_redaction']: opts[k] = str(int(v)) elif k == 'compression': opts[k] = v.value else: opts[k] = v return opts
[docs]class Cluster(CoreClient): @internal def __init__(self, connection_string, # type: str options=None, # type: ClusterOptions bucket_factory=Bucket, # type: Any **kwargs # type: Any ): self._authenticator = kwargs.pop('authenticator', None) self.__is_6_5 = None # copy options if they exist, as we mutate it cluster_opts = deepcopy(options) or ClusterOptions(self._authenticator) if not self._authenticator: self._authenticator = cluster_opts.pop('authenticator', None) if not self._authenticator: raise InvalidArgumentException("Authenticator is mandatory") async_items = {k: kwargs.pop(k) for k in list(kwargs.keys()) if k in {'_iops', '_flags'}} # fixup any overrides to the ClusterOptions here as well args, kwargs = cluster_opts.split_args(**kwargs) self.connstr = cluster_opts.update_connection_string(connection_string, **args) self.__admin = None self._cluster = CoreCluster(self.connstr, bucket_factory=bucket_factory) # type: CoreCluster self._cluster.authenticate(self._authenticator) credentials = self._authenticator.get_credentials() self._clusteropts = dict(**credentials.get('options', {})) # TODO: eliminate the 'mock hack' and ClassicAuthenticator, then you can remove this as well. self._clusteropts.update(kwargs) self._adminopts = dict(**self._clusteropts) self._clusteropts.update(async_items) self._connstr_opts = cluster_opts self.connstr = cluster_opts.update_connection_string(self.connstr) super(Cluster, self).__init__(connection_string=str(self.connstr), _conntype=_LCB.LCB_TYPE_CLUSTER, **self._clusteropts)
[docs] @classmethod def connect(cls, connection_string, # type: str options=None, # type: ClusterOptions **kwargs ): # type: (...) -> Cluster """ Create a Cluster object. An Authenticator must be provided, either as the authenticator named parameter, or within the options argument. :param connection_string: the connection string for the cluster. :param options: options for the cluster. :param Any kwargs: Override corresponding value in options. """ return cls(connection_string, options, **kwargs)
def _do_ctor_connect(self, *args, **kwargs): super(Cluster,self)._do_ctor_connect(*args,**kwargs) def _check_for_shutdown(self): if not self._cluster: raise AlreadyShutdownException("This cluster has already been shutdown") @property @internal def _admin(self): self._check_for_shutdown() if not self.__admin: c = ConnectionString.parse(self.connstr) if not c.bucket: c.bucket = self._adminopts.pop('bucket', None) self.__admin = Admin(connection_string=str(c), **self._adminopts) return self.__admin
[docs] def bucket(self, name # type: str ): # type: (...) -> Bucket """ Open a bucket on this cluster. This doesn't create a bucket, merely opens an existing bucket. :param name: Name of bucket to open. :return: The :class:~.bucket.Bucket` you requested. :raise: :exc:`~.exceptions.BucketDoesNotExistException` if the bucket has not been created on this cluster. """ self._check_for_shutdown() if not self.__admin: self._adminopts['bucket'] = name return self._cluster.open_bucket(name, admin=self._admin)
# Temporary, helpful with working around CCBC-1204. We should be able to get rid of this # logic when this issue is fixed. def _is_6_5_plus(self): self._check_for_shutdown() # lets just check once. Below, we will only set this if we are sure about the value. if self.__is_6_5 is not None: return self.__is_6_5 try: response = self._admin.http_request(path="/pools").value v = response.get("implementationVersion") # lets just get first 3 characters -- the string should be X.Y.Z-XXXX-YYYY and we only care about # major and minor version self.__is_6_5 = (float(v[:3]) >= 6.5) except NetworkException as e: # the cloud doesn't let us query this endpoint, and so lets assume this is a cloud instance. However # lets not actually set the __is_6_5 flag as this also could be a transient error. That means cloud # instances check every time, but this is only temporary. return True except ValueError: # this comes from the conversion to float -- the mock says "CouchbaseMock..." self.__is_6_5 = True return self.__is_6_5
[docs] def query(self, statement, # type: str *options, # type: Union[QueryOptions,Any] **kwargs # type: Any ): # type: (...) -> QueryResult """ Perform a N1QL query. :param statement: the N1QL query statement to execute :param options: A QueryOptions object or the positional parameters in the query. :param kwargs: Override the corresponding value in the Options. If they don't match any value in the options, assumed to be named parameters for the query. :return: The results of the query or error message if the query failed on the server. :raise: :exc:`~.exceptions.QueryException` - for errors involving the query itself. Also any exceptions raised by underlying system - :class:`~.exceptions.TimeoutException` for instance. """ # we could have multiple positional parameters passed in, one of which may or may not be # a QueryOptions. Note if multiple QueryOptions are passed in for some strange reason, # all but the last are ignored. self._check_for_shutdown() itercls = kwargs.pop('itercls', QueryResult) opt = QueryOptions() opts = list(options) for o in opts: if isinstance(o, QueryOptions): opt = o opts.remove(o) # if not a 6.5 cluster, we need to query against a bucket. We think once # CCBC-1204 is addressed, we can just use the cluster's instance return self._maybe_operate_on_an_open_bucket(CoreClient.query, QueryException, opt.to_n1ql_query(statement, *opts, **kwargs), itercls=itercls, err_msg="Query requires an open bucket")
# gets a random bucket from those the cluster has opened def _get_an_open_bucket(self, err_msg): clients = [v() for k, v in self._cluster._buckets.items()] clients = [v for v in clients if v] if clients: return choice(clients) raise NoBucketException(err_msg) def _maybe_operate_on_an_open_bucket(self, verb, failtype, *args, **kwargs): if self._is_6_5_plus(): kwargs.pop('err_msg', None) return self._operate_on_cluster(verb, failtype, *args, **kwargs) return self._operate_on_an_open_bucket(verb, failtype, *args, **kwargs) def _operate_on_an_open_bucket(self, verb, failtype, *args, **kwargs): try: return verb(self._get_an_open_bucket(kwargs.pop('err_msg', 'Cluster has no open buckets')), *args, **kwargs) except Exception as e: raise_from(failtype(params=CouchbaseException.ParamType(message='Cluster operation on bucket failed', inner_cause=e)), e) def _operate_on_cluster(self, verb, failtype, # type: Type[CouchbaseException] *args, **kwargs): try: return verb(self, *args, **kwargs) except Exception as e: raise_from(failtype(params=CouchbaseException.ParamType(message="Cluster operation failed", inner_cause=e)), e) # for now this just calls functions. We can return stuff if we need it, later. def _sync_operate_on_entire_cluster(self, verb, *args, **kwargs): clients = [v() for k, v in self._cluster._buckets.items()] clients = [v for v in clients if v] clients.append(self) results = [] for c in clients: results.append(verb(c, *args, **kwargs)) return results async def _operate_on_entire_cluster(self, verb, failtype, *args, **kwargs): # if you don't have a cluster client yet, then you don't have any other buckets open either, so # this is the same as operate_on_cluster if not self._cluster._buckets: return self._operate_on_cluster(verb, failtype, *args, **kwargs) async def coroutine(client, verb, *args, **kwargs): return verb(client, *args, **kwargs) # ok, lets loop over all the buckets, and the clusterclient. And lets do it async so it isn't miserably # slow. So we will create a list of tasks and execute them together... tasks = [asyncio.ensure_future(coroutine(self, verb, *args, **kwargs))] for name, c in self._cluster._buckets.items(): client = c() if client: tasks.append(coroutine(client, verb, *args, **kwargs)) done, pending = await asyncio.wait(tasks) results = [] for d in done: results.append(d.result()) return results
[docs] def analytics_query(self, # type: Cluster statement, # type: str, *options, # type: AnalyticsOptions **kwargs ): # type: (...) -> AnalyticsResult """ Executes an Analytics query against the remote cluster and returns a AnalyticsResult with the results of the query. :param statement: the analytics statement to execute :param options: the optional parameters that the Analytics service takes based on the Analytics RFC. :return: An AnalyticsResult object with the results of the query or error message if the query failed on the server. :raise: :exc:`~.exceptions.AnalyticsException` errors associated with the analytics query itself. Also, any exceptions raised by the underlying platform - :class:`~.exceptions.TimeoutException` for example. """ # following the query implementation, but this seems worth revisiting soon self._check_for_shutdown() itercls = kwargs.pop('itercls', AnalyticsResult) opt = AnalyticsOptions() opts = list(options) for o in opts: if isinstance(o, AnalyticsOptions): opt = o opts.remove(o) return self._maybe_operate_on_an_open_bucket(CoreClient.analytics_query, AnalyticsException, opt.to_analytics_query(statement, *opts, **kwargs), itercls=itercls, err_msg='Analytics queries require an open bucket')
[docs] def search_query(self, index, # type: str query, # type: SearchQuery *options, # type: SearchOptions **kwargs ): # type: (...) -> SearchResult """ Executes a Search or FTS query against the remote cluster and returns a SearchResult implementation with the results of the query. .. code-block:: python from couchbase.search import MatchQuery, SearchOptions it = cb.search('name', MatchQuery('nosql'), SearchOptions(limit=10)) for hit in it: print(hit) :param str index: Name of the index to use for this query. :param query: the fluent search API to construct a query for FTS. :param options: the options to pass to the cluster with the query. :param kwargs: Overrides corresponding value in options. :return: A :class:`~.search.SearchResult` object with the results of the query or error message if the query failed on the server. :raise: :exc:`~.exceptions.SearchException` Errors related to the query itself. Also, any exceptions raised by the underlying platform - :class:`~.exceptions.TimeoutException` for example. """ self._check_for_shutdown() def do_search(dest): search_params = SearchOptions.gen_search_params_cls(index, query, *options, **kwargs) return search_params.itercls(search_params.body, dest, **search_params.iterargs) return self._maybe_operate_on_an_open_bucket(do_search, SearchException, err_msg="No buckets opened on cluster")
_root_diag_data = {'id', 'version', 'sdk'} def diagnostics(self, *options, # type: DiagnosticsOptions **kwargs ): # type: (...) -> DiagnosticsResult """ Creates a diagnostics report that can be used to determine the healthfulness of the Cluster. :param options: Options for the diagnostics :return: A :class:`~.diagnostics.DiagnosticsResult` object with the results of the query or error message if the query failed on the server. """ self._check_for_shutdown() result = self._sync_operate_on_entire_cluster(CoreClient.diagnostics, **forward_args(kwargs, *options)) return DiagnosticsResult(result) def ping(self, *options, # type: PingOptions **kwargs ): # type: (...) -> PingResult """ Actively contacts each of the services and returns their pinged status. :param options: Options for sending the ping request. :param kwargs: Overrides corresponding value in options. :return: A :class:`~.result.PingResult` representing the state of all the pinged services. :raise: :class:`~.exceptions.CouchbaseException` for various communication issues. """ bucket = self._get_an_open_bucket("Ping requires an open bucket") if bucket: return PingResult(bucket.ping(*options, **kwargs)) raise NoBucketException("ping requires a bucket be opened first") def users(self): # type: (...) -> UserManager """ Get the UserManager. :return: A :class:`~.management.UserManager` with which you can create or update cluster users and roles. """ self._check_for_shutdown() return UserManager(self._admin) def query_indexes(self): # type: (...) -> QueryIndexManager """ Get the QueryIndexManager. :return: A :class:`~.management.QueryIndexManager` with which you can create or modify query indexes on the cluster. """ self._check_for_shutdown() return QueryIndexManager(self._admin) def search_indexes(self): # type: (...) -> SearchIndexManager """ Get the SearchIndexManager. :return: A :class:`~.management.SearchIndexManager` with which you can create or modify search (FTS) indexes on the cluster. """ self._check_for_shutdown() return SearchIndexManager(self._admin) def analytics_indexes(self): # type: (...) -> AnalyticsIndexManager """ Get the AnalyticsIndexManager. :return: A :class:`~.management.AnalyticsIndexManager` with which you can create or modify analytics datasets, dataverses, etc.. on the cluster. """ self._check_for_shutdown() return AnalyticsIndexManager(self) def buckets(self): # type: (...) -> BucketManager """ Get the BucketManager. :return: A :class:`~.management.BucketManager` with which you can create or modify buckets on the cluster. """ self._check_for_shutdown() return BucketManager(self._admin) def disconnect(self): # type: (...) -> None """ Closes and cleans up any resources used by the Cluster and any objects it owns. :return: None :raise: Any exceptions raised by the underlying platform. """ # in this context, if we invoke the _cluster's destructor, that will do same for # all the buckets we've opened, unless they are stored elswhere and are actively # being used. self._cluster = None self.__admin = None # Only useful for 6.5 DP testing def _is_dev_preview(self): self._check_for_shutdown() return self._admin.http_request(path="/pools").value.get("isDeveloperPreview", False) @property def query_timeout(self): # type: (...) -> timedelta """ The timeout for N1QL query operations, as a `timedelta`. This affects the :meth:`query` method. This can be set in :meth:`connect` by passing in a :class:`ClusterOptions` with the query_timeout set to the desired time. Timeouts may also be adjusted on a per-query basis by setting the :attr:`timeout` property in the options to the n1ql_query method. The effective timeout is either the per-query timeout or the global timeout, whichever is lower. """ self._check_for_shutdown() return timedelta(seconds=self._get_timeout_common(_LCB.LCB_CNTL_QUERY_TIMEOUT)) @property def tracing_threshold_query(self): # type: (...) -> timedelta """ The tracing threshold for query response times, as `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired tracing_threshold_query set in it. """ return timedelta(seconds=self._cntl(op=_LCB.TRACING_THRESHOLD_QUERY, value_type="timeout")) @property def tracing_threshold_search(self): # type: (...) -> timedelta """ The tracing threshold for search response times, as `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired tracing_threshold_search set in it. """ return timedelta(seconds=self._cntl(op=_LCB.TRACING_THRESHOLD_SEARCH, value_type="timeout")) @property def tracing_threshold_analytics(self): # type: (...) -> timedelta """ The tracing threshold for analytics, as `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired tracing_threshold_analytics set in it. """ return timedelta(seconds=self._cntl(op=_LCB.TRACING_THRESHOLD_ANALYTICS, value_type="timeout")) @property def tracing_orphaned_queue_flush_interval(self): # type: (...) -> timedelta """ Returns the interval that the orphaned responses are logged, as a `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired interval set in it. """ return timedelta(seconds=self._cntl(op=_LCB.TRACING_ORPHANED_QUEUE_FLUSH_INTERVAL, value_type="timeout")) @property def tracing_orphaned_queue_size(self): # type: (...) -> int """ Returns the tracing orphaned queue size. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the size set in it. """ return self._cntl(op=_LCB.TRACING_ORPHANED_QUEUE_SIZE, value_type="uint32_t") @property def tracing_threshold_queue_flush_interval(self): # type: (...) -> timedelta """ The tracing threshold queue flush interval, as a `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired interval set in it. """ return timedelta(seconds=self._cntl(op=_LCB.TRACING_THRESHOLD_QUEUE_FLUSH_INTERVAL, value_type="timeout")) @property def tracing_threshold_queue_size(self): # type: (...) -> int """ The tracing threshold queue size. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired size set in it. """ return self._cntl(op=_LCB.TRACING_THRESHOLD_QUEUE_SIZE, value_type="uint32_t") @property def redaction(self): # type: (...) -> bool """ Returns whether or not the logs will redact sensitive information. """ return bool(self._cntl(_LCB.LCB_CNTL_LOG_REDACTION, value_type='int')) @property def compression(self): # type: (...) -> Compression """ Returns the compression mode to be used when talking to the server. See :class:`Compression` for details. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired compression set in it. """ return Compression.from_int( self._cntl(_LCB.LCB_CNTL_COMPRESSION_OPTS, value_type='int') ) @property def compression_min_size(self): # type: (...) -> int """ Minimum size (in bytes) of the document payload to be compressed when compression enabled. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired compression set in it. """ return self._cntl(_LCB.LCB_CNTL_COMPRESSION_MIN_SIZE, value_type='uint32_t') @property def compression_min_ratio(self): # type: (...) -> float """ Minimum compression ratio (compressed / original) of the compressed payload to allow sending it to cluster. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired ratio set in it. """ return self._cntl(_LCB.LCB_CNTL_COMPRESSION_MIN_RATIO, value_type='float') @property def is_ssl(self): # type: (...) -> bool """ Read-only boolean property indicating whether SSL is used for this connection. If this property is true, then all communication between this object and the Couchbase cluster is encrypted using SSL. See :meth:`__init__` for more information on connection options. """ mode = self._cntl(op=_LCB.LCB_CNTL_SSL_MODE, value_type='int') return mode & _LCB.LCB_SSL_ENABLED != 0
class AsyncCluster(AsyncClientMixin, Cluster): @classmethod def connect(cls, connection_string=None, *args, **kwargs): return cls(connection_string=connection_string, *args, **kwargs)