Source code for couchbase.cluster

import asyncio
from typing import NamedTuple

from couchbase_core.asynchronous.client import AsyncClientMixin
from couchbase.mutation_state import MutationState
from couchbase.management.queries import QueryIndexManager
from couchbase.management.search import SearchIndexManager
from couchbase.management.analytics import AnalyticsIndexManager
from couchbase.tracing import CouchbaseTracer, CouchbaseSpan
from couchbase.analytics import AnalyticsOptions
from couchbase_core.mapper import identity
from .auth import NoBucketException, Authenticator
from .management.users import UserManager
from .management.buckets import BucketManager
from couchbase.management.admin import Admin
from couchbase.diagnostics import DiagnosticsResult
from couchbase.search import SearchResult, SearchOptions, SearchQuery
from .analytics import AnalyticsResult
from .n1ql import QueryResult
from couchbase_core.n1ql import _N1QLQuery
from .options import (
    OptionBlock,
    OptionBlockDeriv,
    QueryBaseOptions,
    LockMode,
    enum_value,
)
from .bucket import Bucket, CoreClient, PingOptions
from couchbase_core.cluster import _Cluster as CoreCluster
from .exceptions import (
    AlreadyShutdownException,
    InvalidArgumentException,
    SearchException,
    QueryException,
    AnalyticsException,
    CouchbaseException,
    NetworkException,
)
import couchbase_core._libcouchbase as _LCB
from couchbase_core._pyport import raise_from
from couchbase_core.cluster import *
from .result import *
from random import choice
from enum import Enum
from copy import deepcopy
from datetime import timedelta


T = TypeVar("T")
ServerVersion = NamedTuple(
    "ServerVersion", [("full_version", str),
                      ("short_version", float),
                      ("is_dp", bool),
                      ("is_enterprise", bool)])

CallableOnOptionBlock = Callable[[OptionBlockDeriv, Any], Any]


class DiagnosticsOptions(OptionBlock):
    @overload
    def __init__(
        self, report_id=None  # type: str
    ):
        pass

    def __init__(self, **kwargs):
        """

        :param str report_id: An id that is appended on to the :class:`~.DiagnosticsResult`.  Helps with
            disambiguating reports when you have several running.
        """
        super(DiagnosticsOptions, self).__init__(**kwargs)

    @classmethod
    def from_eventing_server(cls, value):
        if value == "request":
            return cls.REQUEST_PLUS
        elif value == "none":
            return cls.NOT_BOUNDED
        else:
            raise InvalidArgumentException(
                "Invalid value for scane consistency: {}".format(value)
            )


[docs]class QueryScanConsistency(Enum): """ QueryScanConsistency This can be: NOT_BOUNDED Which means we just return what is currently in the indexes, or REQUEST_PLUS which means we 'read our own writes'. Slower, since the query has to wait for the indexes to catch up. """ REQUEST_PLUS = "request_plus" NOT_BOUNDED = "not_bounded" @classmethod def to_eventing_server(cls, value): if value == cls.REQUEST_PLUS: return "request" elif value == cls.NOT_BOUNDED: return "none" else: raise InvalidArgumentException( "Invalid value for eventing scan consistency: {}".format(value) ) @classmethod def from_eventing_server(cls, value): if value == "request": return cls.REQUEST_PLUS elif value == "none": return cls.NOT_BOUNDED else: raise InvalidArgumentException( "Invalid value for eventing scan consistency: {}".format(value) )
class QueryProfile(Enum): """ QueryProfile You can chose to set this to: OFF No query profiling data will be collected. PHASES Profile will have details on phases. TIMINGS Profile will have phases, and details on the query plan execution as well. """ OFF = "off" PHASES = "phases" TIMINGS = "timings" class NamedClass(type): def __new__(cls, name, bases=tuple(), namespace=dict()): super(NamedClass, cls).__new__( cls, name, bases=bases, namespace=namespace)
[docs]class QueryOptions(QueryBaseOptions): VALID_OPTS = { "timeout": {"timeout": timedelta.total_seconds}, "read_only": {"readonly": identity}, "scan_consistency": {"consistency": enum_value}, "adhoc": {"adhoc": identity}, "client_context_id": {"client_context_id": identity}, "consistent_with": {"consistent_with": identity}, "max_parallelism": {}, "positional_parameters": {}, "named_parameters": {}, "pipeline_batch": {"pipeline_batch": identity}, "pipeline_cap": {"pipeline_cap": identity}, "profile": {"profile": enum_value}, "query_context": {"query_context": identity}, "raw": {"raw": identity}, "scan_wait": {}, "scan_cap": {"scan_cap": identity}, "metrics": {"metrics": identity}, "flex_index": {"flex_index": int}, "span": {"span": identity}, "preserve_expiry": {"preserve_expiry": identity} } TARGET_CLASS = _N1QLQuery @overload def __init__( self, timeout=None, # type: timedelta read_only=None, # type: bool scan_consistency=None, # type: QueryScanConsistency adhoc=None, # type: bool client_context_id=None, # type: str consistent_with=None, # type: MutationState max_parallelism=None, # type: int positional_parameters=None, # type: Iterable[JSON] named_parameters=None, # type: dict[str, JSON] pipeline_batch=None, # type: int pipeline_cap=None, # type: int profile=None, # type: QueryProfile query_context=None, # type: str raw=None, # type: dict[str, JSON] scan_wait=None, # type: timedelta scan_cap=None, # type: int metrics=False, # type: bool flex_index=False, # type: bool span=None, # type: Span preserve_expiry=None # type: bool ): pass
[docs] def __init__(self, **kwargs): """ QueryOptions Various options for queries :param timedelta timeout: Uses this timeout value, rather than the default for the cluster. See :meth:`~Cluster.query_timeout`. :param bool read_only: Hint to the server that this is a read-only query. :param QueryScanConsistency scan_consistency: Specify the level of consistency for the query. Overrides any setting in consistent_with. Can be either :meth:`~.QueryScanConsistency.NOT_BOUNDED`, which means return what is in the index now, or :meth:`~.QueryScanConsistency.REQUEST_PLUS`, which means you can read your own writes. Slower, but when you need it you have it. :param bool adhoc: Specifies if the prepared statement logic should be executed internally. :param str client_context_id: Specifies a context ID string which is mirrored back from the query engine on response. :param MutationState consistent_with: Specifies custom scan consistency through “at_plus” with mutation state token vectors. :param int max_parallelism: The maximum number of logical cores to use in parallel for this query. :param Iterable[JSON] positional_parameters: Specifies the parameters used in the query, when positional notation ($1, $2, etc...) is used. :param dict[str,JSON] named_parameters: Specifies the parameters used in the query, when named parameter notation ($foo, $bar, etc...) is used. :param int pipeline_batch: Specifies pipeline batching characteristics. :param int pipeline_cap: Specifies pipeline cap characteristics. :param QueryProfile profile: Specifies the profiling level to use. :param str query_context: Specifies the context for the query. :param dict[str,JSON] raw: This is a way to to specify the query payload to support unknown commands and be future-compatible. :param timedelta scan_wait: Specifies maximum amount of time to wait for a scan. :param int scan_cap: Specifies the scan cap characteristics. :param bool metrics: Specifies whether or not to include metrics with the :class:`~.QueryResult`. :param bool flex_index Specifies whether this query may make use of Search indexes :param CouchbaseSpan span Specifies the parent span for this query :param bool preserve_expiry **UNCOMMITTED** preserve_expiry is an uncommitted API that is unlikely to change, but may still change as final consensus on its behavior has not yet been reached. Specifies whether this query will preserve expiry on mutation """ super(QueryOptions, self).__init__(**kwargs)
class ClusterTimeoutOptions(dict): KEY_MAP = { "kv_timeout": "operation_timeout", "query_timeout": "query_timeout", "views_timeout": "views_timeout", "config_total_timeout": "config_total_timeout", } @overload def __init__( self, query_timeout=None, # type: timedelta kv_timeout=None, # type: timedelta views_timeout=None, # type: timedelta config_total_timeout=None, # type: timedelta ): pass def __init__(self, **kwargs): """ ClusterTimeoutOptions These will be the default timeouts for queries, kv operations or views for the entire cluster :param timedelta query_timeout: Timeout for query operations. :param timedelta kv_timeout: Timeout for KV operations. :param timedelta views_timeout: Timeout for View operations. :param timedelta config_total_timeout: Timeout for complete bootstrap configuration """ super().__init__(**kwargs) def as_dict(self): opts = {} for k, v in self.items(): if v is None or k not in self.KEY_MAP.keys(): continue elif k in self.KEY_MAP: opts[self.KEY_MAP[k]] = v.total_seconds() else: opts[k] = v return opts class Compression(Enum): """ Can be one of: NONE: The client will not compress or decompress the data. IN: The data coming back from the server will be decompressed, if it was compressed. OUT: The data coming into server will be compressed. INOUT: The data will be compressed on way in, decompressed on way out of server. FORCE: By default the library will send a HELLO command to the server to determine whether compression is supported or not. Because commands may be pipelined prior to the scheduing of the HELLO command it is possible that the first few commands may not be compressed when schedule due to the library not yet having negotiated settings with the server. Setting this flag will force the client to assume that all servers support compression despite a HELLO not having been intially negotiated. """ @classmethod def from_int(cls, val): if val == 0: return cls.NONE elif val == 1: return cls.IN elif val == 2: return cls.OUT elif val == 3: return cls.INOUT elif val == 7: # note that the lcb flag is a 4, but when you set "force" in the connection # string, it sets it as INOUT|FORCE. return cls.FORCE else: raise InvalidArgumentException( "cannot convert {} to a Compression".format(val) ) NONE = "off" IN = "inflate_only" OUT = "deflate_only" INOUT = "on" FORCE = "force" class ClusterTracingOptions(dict): INT_KEYS = ["tracing_threshold_queue_size", "tracing_orphaned_queue_size"] KEYS = [ "tracing_threshold_kv", "tracing_threshold_view", "tracing_threshold_query", "tracing_threshold_search", "tracing_threshold_analytics", "tracing_threshold_queue_size", "tracing_threshold_queue_flush_interval", "tracing_orphaned_queue_size", "tracing_orphaned_queue_flush_interval", ] @overload def __init__( self, tracing_threshold_kv=None, # type: timedelta tracing_threshold_view=None, # type: timedelta tracing_threshold_query=None, # type: timedelta tracing_threshold_search=None, # type: timedelta tracing_threshold_analytics=None, # type: timedelta tracing_threshold_queue_size=None, # type: int tracing_threshold_queue_flush_interval=None, # type: timedelta tracing_orphaned_queue_size=None, # type: int tracing_orphaned_queue_flush_interval=None, # type: timedelta ): pass def __init__(self, **kwargs): """ ClusterTracingOptions These parameters control when/how our request tracing will log slow requests or orphaned responses. :param timedelta tracing_threshold_kv: Any KV request taking longer than this will be traced. :param timedelta tracing_threshold_view: Any View request taking longer than this will be traced. :param timedelta tracing_threshold_query: Any Query request taking longer than this will be traced. :param timedelta tracing_threshold_search: Any Search request taking longer than this will be traced. :param timedelta tracing_threshold_analytics: Any Analytics request taking longer than this will be traced :param int tracing_threshold_queue_size: Limits the number of requests traced. :param timedelta tracing_threshold_queue_flush_interval: Interval between flushes of the threshold queues. :param int tracing_orphaned_queue_size: Limits the number of orphaned requests traced. :param timedelta tracing_orphaned_queue_flush_interval: Interval between flushes of the orphaned queue. """ super().__init__(**kwargs) def as_dict(self): opts = {} for k, v in self.items(): if v is None or k not in self.KEYS: continue elif isinstance(v, timedelta): opts[k] = v.total_seconds() else: opts[k] = v return opts
[docs]class ClusterOptions(dict): KEYS = [ "timeout_options", "tracing_options", "log_redaction", "compression", "compression_min_size", "compression_min_ratio", "certpath", "enable_mutation_tokens", "tracer", ] @overload def __init__( self, authenticator, # type: Authenticator timeout_options=None, # type: ClusterTimeoutOptions tracing_options=None, # type: ClusterTracingOptions log_redaction=None, # type: bool compression=None, # type: Compression compression_min_size=None, # type: int compression_min_ratio=None, # type: float lockmode=None, # type: LockMode enable_mutation_tokens=None, # type: bool tracer=None, # type: CouchbaseTracer transcoder=None # type: Transcoder ): pass
[docs] def __init__( self, authenticator, # type: Authenticator **kwargs ): """ Options to set when creating a cluster. Note the authenticator is mandatory, all the others are optional. :param Authenticator authenticator: :class:`~.Authenticator` to use - see :class:`~.PasswordAuthenticator` and :class:`~.CertAuthenticator`. :param ClusterTimeoutOptions timeout_options: A :class:`~.ClusterTimeoutOptions` object, with various optional timeouts. :param ClusterTracingOptions tracing_options: A :class:`~.ClusterTracingOptions` object, with various options for tracing. :param bool log_redaction: Turn log redaction on/off. :param Compression compression: A :class:`~.Compression` value for this cluster. :param int compression_min_size: Min size of the data before compression kicks in. :param float compression_min_ratio: A `float` representing the minimum compression ratio to use when compressing. :param bool enable_mutation_tokens: Turn mutation tokens on/off. On by default. :param CouchbaseTracer tracer: Tracer to use. None by default, which uses internal tracing only. :param Transcoder transcoder: Global transcoder to use. None by default, which uses default transcoder. """ super(ClusterOptions, self).__init__(**kwargs) self["authenticator"] = authenticator
def update_connection_string(self, connstr, **kwargs): opts = self.as_dict(**kwargs) if len(opts) == 0: return connstr conn = ConnectionString.parse(connstr) for k, v in opts.items(): conn.set_option(k, v) return conn.encode() def split_args(self, **kwargs): # return a tuple with args we recognize and those we don't, which # should be kwargs in connect ours = {} theirs = {} for k, v in kwargs.items(): if k in self.KEYS: ours[k] = v else: theirs[k] = v return ( ours, theirs, ) def as_dict(self, **kwargs): # the kwargs override or add to existing args. So you could do something like: # opts.as_dict(tracing_options=TracingOptions(tracing_threshold_kv=timedelta(seconds=1)), # compression=Compression.NONE) # and expect those values to override the corresponding ones in the output. # # first, get the nested dicts and update if necessary for k in ["timeout_options", "tracing_options"]: obj = self.get(k, {}).update(kwargs.pop(k, {})) if obj: self.update({k: obj}) # now, update the top-level ones self.update(kwargs) # now convert final product opts = {} for k, v in self.items(): if v is None or k not in self.KEYS: continue elif k in ["timeout_options", "tracing_options"]: opts.update(v.as_dict()) elif k in [ "compression_min_size", "log_redaction", "enable_mutation_tokens", ]: opts[k] = str(int(v)) elif k == "compression": opts[k] = v.value else: opts[k] = v return opts
[docs]class Cluster(CoreClient): @internal def __init__( self, connection_string, # type: str options=None, # type: ClusterOptions bucket_factory=Bucket, # type: Any admin_factory=Admin, # type: Admin **kwargs # type: Any ): self._authenticator = kwargs.pop("authenticator", None) # get tracer from kwargs, if present self._external_tracer = kwargs.pop("tracer", None) self.__is_6_5 = None # copy options if they exist, as we mutate it cluster_opts = deepcopy(options) or ClusterOptions(self._authenticator) if not self._authenticator: self._authenticator = cluster_opts.pop("authenticator", None) if not self._authenticator: raise InvalidArgumentException("Authenticator is mandatory") # get tracer from options, if not in kwargs if not self._external_tracer and options: self._external_tracer = options.get("tracer", None) async_items = { k: kwargs.pop(k) for k in list(kwargs.keys()) if k in {"_iops", "_flags"} } non_connstr_opts = { k: cluster_opts.pop(k) for k in list(cluster_opts.keys()) if k in ["lockmode", "transcoder"] } # fixup any overrides to the ClusterOptions here as well args, kwargs = cluster_opts.split_args(**kwargs) self.connstr = cluster_opts.update_connection_string( connection_string, **args) self.__admin = None self._cluster = CoreCluster( self.connstr, bucket_factory=bucket_factory ) # type: CoreCluster self._cluster.authenticate(self._authenticator) credentials = self._authenticator.get_credentials() self._clusteropts = dict(**credentials.get("options", {})) # TODO: eliminate the 'mock hack' and ClassicAuthenticator, then you # can remove this as well. self._clusteropts.update(non_connstr_opts) self._clusteropts.update(kwargs) self._adminopts = dict(**self._clusteropts) if "admin_factory" not in self._adminopts: self._adminopts["admin_factory"] = admin_factory self._clusteropts.update(async_items) self.connstr = cluster_opts.update_connection_string( self.connstr, **self._clusteropts ) # PYCBC-949 remove certpath, it is not accepted by super(Cluster) # (it has been copied into self.connstr) self._clusteropts.pop("certpath", None) self._adminopts.pop("certpath", None) # pop tracer into _clusteropts, since you can't copy a tracer self._clusteropts["tracer"] = self._external_tracer super(Cluster, self).__init__( connection_string=str(self.connstr), _conntype=_LCB.LCB_TYPE_CLUSTER, **self._clusteropts ) if(type(self).__name__ == "Cluster"): self._set_server_version()
[docs] @classmethod def connect( cls, connection_string, # type: str options=None, # type: ClusterOptions **kwargs ): # type: (...) -> Cluster """ Create a Cluster object. An Authenticator must be provided, either as the authenticator named parameter, or within the options argument. :param connection_string: the connection string for the cluster. :param options: options for the cluster. :param Any kwargs: Override corresponding value in options. """ return cls(connection_string, options, **kwargs)
def _do_ctor_connect(self, *args, **kwargs): super(Cluster, self)._do_ctor_connect(*args, **kwargs) def _check_for_shutdown(self): if not self._cluster: raise AlreadyShutdownException( "This cluster has already been shutdown") @internal def _set_server_version(self, # type: "Cluster" override=None # type: Union[str, dict] ) -> None: """ Internal method to set the server version. :param override: HTTP response from /pools (optional). """ if override is not None: self.server_version = override if self.server_version: try: version_json = self.server_version if isinstance(self.server_version, str): version_json = json.loads(self.server_version) version_raw = version_json.get("implementationVersion") is_dp = version_json.get("isDeveloperPreview", False) is_enterprise = version_json.get( "isEnterprise", False) # version string should be X.Y.Z-XXXX-YYYY self.server_version = ServerVersion(version_raw[:10], float(version_raw[:3]), is_dp, is_enterprise) except ValueError: # this comes from the conversion to float -- the mock says # "CouchbaseMock..." self.server_version = ServerVersion( "CouchbaseMock", float(6.5), False, True) def get_server_version(self # type: "Cluster" ) -> ServerVersion: return self.server_version @property @internal def _admin(self): self._check_for_shutdown() if not self.__admin: c = ConnectionString.parse(self.connstr) if not c.bucket: c.bucket = self._adminopts.pop("bucket", None) factory = self._adminopts.pop("admin_factory", Admin) self.__admin = factory(connection_string=str(c), **self._adminopts) return self.__admin
[docs] def bucket( self, name # type: str ): # type: (...) -> Bucket """ Open a bucket on this cluster. This doesn't create a bucket, merely opens an existing bucket. :param name: Name of bucket to open. :return: The :class:~.bucket.Bucket` you requested. :raise: :exc:`~.exceptions.BucketDoesNotExistException` if the bucket has not been created on this cluster. """ self._check_for_shutdown() if not self.__admin: self._adminopts["bucket"] = name kwargs = {"admin": self._admin} lockmode = self._clusteropts.get("lockmode", None) if lockmode is not None: kwargs["lockmode"] = lockmode transcoder = self._clusteropts.get("transcoder", None) if transcoder is not None: kwargs["transcoder"] = transcoder return self._cluster.open_bucket( name, tracer=self._external_tracer, **kwargs)
# Temporary, helpful with working around CCBC-1204. We should be able to get rid of this # logic when this issue is fixed. def _is_6_5_plus(self): self._check_for_shutdown() # lets just check once. Below, we will only set this if we are sure # about the value. if self.__is_6_5 is not None: return self.__is_6_5 if self.server_version: if isinstance(self.server_version, str): self._set_server_version() self.__is_6_5 = self.server_version.short_version >= 6.5 return self.__is_6_5 # If an async Admin cluster is used (needed for access to management API), # The HTTP request will return w/o a result (it will be pending the async # future/callback). As _is_6_5_plus() is synchronous, there will be no way # to get the result. This scenario should not happen, but in the event it # does, lets give the user an easier error to understand why there is a problem. if type(self._admin).__name__ in ["AAdmin", "TxAdmin"]: raise NotImplementedError( "Cannot execute synchronous HTTP request with asynchronous Admin cluster.") try: response = self._admin.http_request(path="/pools").value self._set_server_version(override=response) self.__is_6_5 = self.server_version.short_version >= 6.5 except NetworkException as e: # the cloud doesn't let us query this endpoint, and so lets assume this is a cloud instance. However # lets not actually set the __is_6_5 flag as this also could be a transient error. That means cloud # instances check every time, but this is only temporary. return True return self.__is_6_5
[docs] def query( self, statement, # type: str *options, # type: Union[QueryOptions,Any] **kwargs # type: Any ): # type: (...) -> QueryResult """ Perform a N1QL query. :param statement: the N1QL query statement to execute :param options: A QueryOptions object or the positional parameters in the query. :param kwargs: Override the corresponding value in the Options. If they don't match any value in the options, assumed to be named parameters for the query. :return: The results of the query or error message if the query failed on the server. :raise: :exc:`~.exceptions.QueryException` - for errors involving the query itself. Also any exceptions raised by underlying system - :class:`~.exceptions.TimeoutException` for instance. """ # we could have multiple positional parameters passed in, one of which may or may not be # a QueryOptions. Note if multiple QueryOptions are passed in for some strange reason, # all but the last are ignored. self._check_for_shutdown() itercls = kwargs.pop("itercls", QueryResult) opt = QueryOptions() opts = list(options) for o in opts: if isinstance(o, QueryOptions): opt = o opts.remove(o) # if not a 6.5 cluster, we need to query against a bucket. We think once # CCBC-1204 is addressed, we can just use the cluster's instance return self._maybe_operate_on_an_open_bucket( CoreClient.query, QueryException, opt.to_query_object(statement, *opts, **kwargs), itercls=itercls, err_msg="Query requires an open bucket", )
# gets a random bucket from those the cluster has opened def _get_an_open_bucket(self, err_msg): clients = [v() for k, v in self._cluster._buckets.items()] clients = [v for v in clients if v] if clients: return choice(clients) raise NoBucketException(err_msg) def _maybe_operate_on_an_open_bucket( self, verb, failtype, *args, **kwargs): if self._is_6_5_plus(): kwargs.pop("err_msg", None) return self._operate_on_cluster(verb, failtype, *args, **kwargs) return self._operate_on_an_open_bucket(verb, failtype, *args, **kwargs) def _operate_on_an_open_bucket(self, verb, failtype, *args, **kwargs): try: return verb( self._get_an_open_bucket( kwargs.pop("err_msg", "Cluster has no open buckets") ), *args, **kwargs ) except Exception as e: raise_from( failtype( params=CouchbaseException.ParamType( message="Cluster operation on bucket failed", inner_cause=e ) ), e, ) def _operate_on_cluster( self, verb, failtype, # type: Type[CouchbaseException] *args, **kwargs ): try: return verb(self, *args, **kwargs) except Exception as e: raise_from( failtype( params=CouchbaseException.ParamType( message="Cluster operation failed", inner_cause=e ) ), e, ) # for now this just calls functions. We can return stuff if we need it, # later. def _sync_operate_on_entire_cluster(self, verb, *args, **kwargs): clients = [v() for k, v in self._cluster._buckets.items()] clients = [v for v in clients if v] clients.append(self) results = [] for c in clients: results.append(verb(c, *args, **kwargs)) return results async def _operate_on_entire_cluster(self, verb, failtype, *args, **kwargs): # if you don't have a cluster client yet, then you don't have any other buckets open either, so # this is the same as operate_on_cluster if not self._cluster._buckets: return self._operate_on_cluster(verb, failtype, *args, **kwargs) async def coroutine(client, verb, *args, **kwargs): return verb(client, *args, **kwargs) # ok, lets loop over all the buckets, and the clusterclient. And lets do it async so it isn't miserably # slow. So we will create a list of tasks and execute them together... tasks = [asyncio.ensure_future(coroutine(self, verb, *args, **kwargs))] for name, c in self._cluster._buckets.items(): client = c() if client: tasks.append(coroutine(client, verb, *args, **kwargs)) done, pending = await asyncio.wait(tasks) results = [] for d in done: results.append(d.result()) return results
[docs] def analytics_query( self, # type: Cluster statement, # type: str *options, # type: AnalyticsOptions **kwargs ): # type: (...) -> AnalyticsResult """ Executes an Analytics query against the remote cluster and returns a AnalyticsResult with the results of the query. :param statement: the analytics statement to execute :param options: the optional parameters that the Analytics service takes based on the Analytics RFC. :return: An AnalyticsResult object with the results of the query or error message if the query failed on the server. :raise: :exc:`~.exceptions.AnalyticsException` errors associated with the analytics query itself. Also, any exceptions raised by the underlying platform - :class:`~.exceptions.TimeoutException` for example. """ # following the query implementation, but this seems worth revisiting # soon self._check_for_shutdown() itercls = kwargs.pop("itercls", AnalyticsResult) opt = AnalyticsOptions() opts = list(options) for o in opts: if isinstance(o, AnalyticsOptions): opt = o opts.remove(o) return self._maybe_operate_on_an_open_bucket( CoreClient.analytics_query, AnalyticsException, opt.to_query_object(statement, *opts, **kwargs), itercls=itercls, err_msg="Analytics queries require an open bucket", )
[docs] def search_query( self, index, # type: str query, # type: SearchQuery *options, # type: SearchOptions **kwargs ): # type: (...) -> SearchResult """ Executes a Search or FTS query against the remote cluster and returns a SearchResult implementation with the results of the query. .. code-block:: python from couchbase.search import MatchQuery, SearchOptions it = cb.search('name', MatchQuery('nosql'), SearchOptions(limit=10)) for hit in it: print(hit) :param str index: Name of the index to use for this query. :param query: the fluent search API to construct a query for FTS. :param options: the options to pass to the cluster with the query. :param kwargs: Overrides corresponding value in options. :return: A :class:`~.search.SearchResult` object with the results of the query or error message if the query failed on the server. :raise: :exc:`~.exceptions.SearchException` Errors related to the query itself. Also, any exceptions raised by the underlying platform - :class:`~.exceptions.TimeoutException` for example. """ self._check_for_shutdown() def do_search(dest): search_params = SearchOptions.gen_search_params_cls( index, query, *options, **kwargs ) return search_params.itercls( search_params.body, dest, **search_params.iterargs ) return self._maybe_operate_on_an_open_bucket( do_search, SearchException, err_msg="No buckets opened on cluster" )
_root_diag_data = {"id", "version", "sdk"} def diagnostics( self, *options, # type: DiagnosticsOptions **kwargs ): # type: (...) -> DiagnosticsResult """ Creates a diagnostics report that can be used to determine the healthfulness of the Cluster. :param options: Options for the diagnostics :return: A :class:`~.diagnostics.DiagnosticsResult` object with the results of the query or error message if the query failed on the server. """ self._check_for_shutdown() result = self._sync_operate_on_entire_cluster( CoreClient.diagnostics, **forward_args(kwargs, *options) ) return DiagnosticsResult(result) def _ping_result_handler(func): result = ping_result_wrapper(func) operation_mode.operate_on_doc(result, lambda x: func.__doc__) result.__name__ = func.__name__ return result @_ping_result_handler def ping( self, *options, # type: PingOptions **kwargs ): # type: (...) -> PingResult """ Actively contacts each of the services and returns their pinged status. :param options: Options for sending the ping request. :param kwargs: Overrides corresponding value in options. :return: A :class:`~.result.PingResult` representing the state of all the pinged services. :raise: :class:`~.exceptions.CouchbaseException` for various communication issues. """ return CoreClient.ping(self, **forward_args(kwargs, *options)) def users(self): # type: (...) -> UserManager """ Get the UserManager. :return: A :class:`~.management.UserManager` with which you can create or update cluster users and roles. """ self._check_for_shutdown() return UserManager(self._admin) def query_indexes(self): # type: (...) -> QueryIndexManager """ Get the QueryIndexManager. :return: A :class:`~.management.QueryIndexManager` with which you can create or modify query indexes on the cluster. """ self._check_for_shutdown() return QueryIndexManager(self._admin) def search_indexes(self): # type: (...) -> SearchIndexManager """ Get the SearchIndexManager. :return: A :class:`~.management.SearchIndexManager` with which you can create or modify search (FTS) indexes on the cluster. """ self._check_for_shutdown() return SearchIndexManager(self._admin) def analytics_indexes(self): # type: (...) -> AnalyticsIndexManager """ Get the AnalyticsIndexManager. :return: A :class:`~.management.AnalyticsIndexManager` with which you can create or modify analytics datasets, dataverses, etc.. on the cluster. """ self._check_for_shutdown() return AnalyticsIndexManager(self, self._admin) def buckets(self): # type: (...) -> BucketManager """ Get the BucketManager. :return: A :class:`~.management.BucketManager` with which you can create or modify buckets on the cluster. """ self._check_for_shutdown() return BucketManager(self._admin) def eventing_functions(self): # type: (...) -> EventingFunctionManager """ Get the EventingFunctionManager. :return: A :class:`~.management.EventingFunctionManager` with which you can create or modify eventing functions on the cluster. """ from couchbase.management.eventing import EventingFunctionManager self._check_for_shutdown() return EventingFunctionManager(self._admin) def disconnect(self): # type: (...) -> None """ Closes and cleans up any resources used by the Cluster and any objects it owns. :return: None :raise: Any exceptions raised by the underlying platform. """ # in this context, if we invoke the _cluster's destructor, that will do same for # all the buckets we've opened, unless they are stored elswhere and are actively # being used. self._cluster = None self.__admin = None # Only useful for 6.5 DP testing def _is_dev_preview(self): self._check_for_shutdown() if self.server_version: if isinstance(self.server_version, str): self._set_server_version() return self.server_version.is_dp # If an async Admin cluster is used (needed for access to management API), # The HTTP request will return w/o a result (it will be pending the async # future/callback). As _is_dev_preview() is synchronous, there will be no way # to get the result. This scenario should not happen, but in the event it # does, lets give the user an easier error to understand why there is a problem. if type(self._admin).__name__ in ["AAdmin", "TxAdmin"]: raise NotImplementedError( "Cannot execute synchronous HTTP request with asynchronous Admin cluster.") response = self._admin.http_request(path="/pools").value self._set_server_version(override=response) return self.server_version.is_dp @property def query_timeout(self): # type: (...) -> timedelta """ The timeout for N1QL query operations, as a `timedelta`. This affects the :meth:`query` method. This can be set in :meth:`connect` by passing in a :class:`ClusterOptions` with the query_timeout set to the desired time. Timeouts may also be adjusted on a per-query basis by setting the :attr:`timeout` property in the options to the n1ql_query method. The effective timeout is either the per-query timeout or the global timeout, whichever is lower. """ self._check_for_shutdown() return timedelta(seconds=self._get_timeout_common( _LCB.LCB_CNTL_QUERY_TIMEOUT)) @property def tracing_threshold_query(self): # type: (...) -> timedelta """ The tracing threshold for query response times, as `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired tracing_threshold_query set in it. """ return timedelta( seconds=self._cntl( op=_LCB.TRACING_THRESHOLD_QUERY, value_type="timeout") ) @property def tracing_threshold_search(self): # type: (...) -> timedelta """ The tracing threshold for search response times, as `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired tracing_threshold_search set in it. """ return timedelta( seconds=self._cntl( op=_LCB.TRACING_THRESHOLD_SEARCH, value_type="timeout") ) @property def tracing_threshold_analytics(self): # type: (...) -> timedelta """ The tracing threshold for analytics, as `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired tracing_threshold_analytics set in it. """ return timedelta( seconds=self._cntl( op=_LCB.TRACING_THRESHOLD_ANALYTICS, value_type="timeout" ) ) @property def tracing_orphaned_queue_flush_interval(self): # type: (...) -> timedelta """ Returns the interval that the orphaned responses are logged, as a `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired interval set in it. """ return timedelta( seconds=self._cntl( op=_LCB.TRACING_ORPHANED_QUEUE_FLUSH_INTERVAL, value_type="timeout" ) ) @property def tracing_orphaned_queue_size(self): # type: (...) -> int """ Returns the tracing orphaned queue size. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the size set in it. """ return self._cntl(op=_LCB.TRACING_ORPHANED_QUEUE_SIZE, value_type="uint32_t") @property def tracing_threshold_queue_flush_interval(self): # type: (...) -> timedelta """ The tracing threshold queue flush interval, as a `timedelta`. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired interval set in it. """ return timedelta( seconds=self._cntl( op=_LCB.TRACING_THRESHOLD_QUEUE_FLUSH_INTERVAL, value_type="timeout" ) ) @property def tracing_threshold_queue_size(self): # type: (...) -> int """ The tracing threshold queue size. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired size set in it. """ return self._cntl(op=_LCB.TRACING_THRESHOLD_QUEUE_SIZE, value_type="uint32_t") @property def redaction(self): # type: (...) -> bool """ Returns whether or not the logs will redact sensitive information. """ return bool(self._cntl(_LCB.LCB_CNTL_LOG_REDACTION, value_type="int")) @property def compression(self): # type: (...) -> Compression """ Returns the compression mode to be used when talking to the server. See :class:`Compression` for details. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired compression set in it. """ return Compression.from_int( self._cntl(_LCB.LCB_CNTL_COMPRESSION_OPTS, value_type="int") ) @property def compression_min_size(self): # type: (...) -> int """ Minimum size (in bytes) of the document payload to be compressed when compression enabled. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired compression set in it. """ return self._cntl(_LCB.LCB_CNTL_COMPRESSION_MIN_SIZE, value_type="uint32_t") @property def compression_min_ratio(self): # type: (...) -> float """ Minimum compression ratio (compressed / original) of the compressed payload to allow sending it to cluster. This can be set in the :meth:`connect` by passing in a :class:`~.ClusterOptions` with the desired ratio set in it. """ return self._cntl(_LCB.LCB_CNTL_COMPRESSION_MIN_RATIO, value_type="float") @property def is_ssl(self): # type: (...) -> bool """ Read-only boolean property indicating whether SSL is used for this connection. If this property is true, then all communication between this object and the Couchbase cluster is encrypted using SSL. See :meth:`__init__` for more information on connection options. """ mode = self._cntl(op=_LCB.LCB_CNTL_SSL_MODE, value_type="int") return mode & _LCB.LCB_SSL_ENABLED != 0
class AsyncCluster(AsyncClientMixin, Cluster): @classmethod def connect(cls, connection_string=None, *args, **kwargs): return cls(connection_string=connection_string, *args, **kwargs)