Source code for couchbase.management.queries

from couchbase_core._ixmgmt import N1QL_PRIMARY_INDEX, IxmgmtRequest, N1qlIndex
from couchbase_core.bucketmanager import BucketManager
from couchbase.options import OptionBlock, OptionBlockTimeOut, forward_args, timedelta
from typing import *
from couchbase.management.generic import GenericManager
import attr
from attr.validators import instance_of as io, deep_mapping as dm
from couchbase_core._pyport import Protocol
from couchbase.exceptions import HTTPException, ErrorMapper, AnyPattern, QueryIndexAlreadyExistsException, \
    QueryIndexNotFoundException, DocumentNotFoundException, DocumentExistsException


class QueryErrorMapper(ErrorMapper):
    @staticmethod
    def mapping():
        # type: (...) -> Dict[CBErrorType,Dict[Any, CBErrorType]]
        return {DocumentNotFoundException: {AnyPattern(): QueryIndexNotFoundException},
                DocumentExistsException: {AnyPattern(): QueryIndexAlreadyExistsException}}


[docs]@QueryErrorMapper.wrap class QueryIndexManager(GenericManager): def __init__(self, parent_cluster): """ Query Index Manager The Query Index Manager interface contains the means for managing indexes used for queries. :param parent_cluster: Parent cluster """ super(QueryIndexManager,self).__init__(parent_cluster)
[docs] def get_all_indexes(self, # type: QueryIndexManager bucket_name, # type: str *options, # type: GetAllQueryIndexOptions **kwargs # type: Any ): # type: (...) -> List[QueryIndex] """ Fetches all indexes from the server. :param str bucket_name: the name of the bucket. :param GetAllQueryIndexOptions options: Options to use for getting all indexes. :param Any kwargs: Override corresponding value in options. :return: A list of QueryIndex objects. :raises: InvalidArgumentsException """ # N1QL # SELECT idx.* FROM system:indexes AS idx # WHERE keyspace_id = "bucket_name" # ORDER BY is_primary DESC, name ASC info = N1qlIndex() info.keyspace = bucket_name response = IxmgmtRequest(self._admin_bucket, 'list', info, **forward_args(kwargs, *options)).execute() return list(map(QueryIndex.from_n1qlindex, response))
def _mk_index_def(self, bucket_name, ix, primary=False): if isinstance(ix, N1qlIndex): return N1qlIndex(ix) info = N1qlIndex() info.keyspace = bucket_name info.primary = primary if ix: info.name = ix elif not primary: raise ValueError('Missing name for non-primary index') return info def _n1ql_index_create(self, bucket_name, ix, defer=False, ignore_exists=False, primary=False, fields=None, cond = None, timeout=None, **kwargs): """ Create an index for use with N1QL. :param str ix: The name of the index to create :param bool defer: Whether the building of indexes should be deferred. If creating multiple indexes on an existing dataset, using the `defer` option in conjunction with :meth:`build_deferred_indexes` and :meth:`watch_indexes` may result in substantially reduced build times. :param bool ignore_exists: Do not throw an exception if the index already exists. :param Iterable[str] fields: A list of fields that should be supplied as keys for the index. For non-primary indexes, this must be specified and must contain at least one field name. :param bool primary: Whether this is a primary index. If creating a primary index, the name may be an empty string and `fields` must be empty. :param str condition: Specify a condition for indexing. Using a condition reduces an index size :raise: :exc:`~.DocumentExistsException` if the index already exists .. seealso:: :meth:`n1ql_index_create_primary` """ fields = fields or [] if kwargs: raise TypeError('Unknown keyword arguments', kwargs) info = self._mk_index_def(bucket_name, ix, primary) if primary and fields: raise TypeError('Cannot create primary index with explicit fields') elif not primary and not fields: raise ValueError('Fields required for non-primary index') if fields: info.fields = fields if primary and info.name is N1QL_PRIMARY_INDEX: del info.name if cond: if primary: raise ValueError('cannot specify condition for primary index') info.condition = cond options = { 'ignore_exists': ignore_exists, 'defer': defer } if timeout: options['timeout']=timeout # Now actually create the indexes return IxmgmtRequest(self._admin_bucket, 'create', info, **options).execute()
[docs] def create_index(self, # type: QueryIndexManager bucket_name, # type: str index_name, # type: str fields, # type: Iterable[str] *options, # type: CreateQueryIndexOptions **kwargs ): # type: (...) -> None """ Creates a new index. :param str bucket_name: name of the bucket. :param str index_name: the name of the index. :param Iterable[str] fields: Fields over which to create the index. :param CreateQueryIndexOptions options: Options to use when creating index. :param Any kwargs: Override corresponding value in options. :raises: QueryIndexAlreadyExistsException :raises: InvalidArgumentsException """ # CREATE INDEX index_name ON bucket_name WITH { "num_replica": 2 } # https://docs.couchbase.com/server/current/n1ql/n1ql-language-reference/createindex.html # self._create_index(bucket_name, fields, index_name, *options, **kwargs)
def _create_index(self, bucket_name, fields, index_name, *options, **kwargs): final_args = { k.replace('deferred', 'defer').replace('condition', 'cond').replace('ignore_if_exists', 'ignore_exists'): v for k, v in forward_args(kwargs, *options).items()} try: self._n1ql_index_create(bucket_name, index_name, fields=fields, **final_args) except QueryIndexAlreadyExistsException: if not final_args.get('ignore_exists', False): raise
[docs] def create_primary_index(self, # type: QueryIndexManager bucket_name, # type: str *options, # type: CreatePrimaryQueryIndexOptions **kwargs ): """ Creates a new primary index. :param str bucket_name: name of the bucket. :param str index_name: name of the index. :param CreatePrimaryQueryIndexOptions options: Options to use when creating primary index :param Any kwargs: Override corresponding values in options. :raises: QueryIndexAlreadyExistsException :raises: InvalidArgumentsException """ # CREATE INDEX index_name ON bucket_name WITH { "num_replica": 2 } # https://docs.couchbase.com/server/current/n1ql/n1ql-language-reference/createindex.html # kwargs['primary'] = True index_name = "" if options and options[0] : index_name = options[0].pop("index_name", "") fields = [] self._create_index(bucket_name, fields, index_name, *options, **kwargs)
def _drop_index(self, bucket_name, index_name, *options, **kwargs): info = BucketManager._mk_index_def(bucket_name, index_name, primary=kwargs.pop('primary',False)) final_args = {k.replace('ignore_if_not_exists','ignore_missing'):v for k,v in forward_args(kwargs, *options).items()} try: IxmgmtRequest(self._admin_bucket, 'drop', info, **final_args).execute() except QueryIndexNotFoundException: if not final_args.get("ignore_missing", False): raise
[docs] def drop_index(self, # type: QueryIndexManager bucket_name, # type: str index_name, # type: str *options, # type: DropQueryIndexOptions **kwargs): """ Drops an index. :param str bucket_name: name of the bucket. :param str index_name: name of the index. :param DropQueryIndexOptions options: Options for dropping index. :param Any kwargs: Override corresponding value in options. :raises: QueryIndexNotFoundException :raises: InvalidArgumentsException """ final_args = forward_args(kwargs, *options) self._drop_index(bucket_name, index_name, **final_args)
[docs] def drop_primary_index(self, # type: QueryIndexManager bucket_name, # type: str *options, # type: DropPrimaryQueryIndexOptions **kwargs): """ Drops a primary index. :param bucket_name: name of the bucket. :param index_name: name of the index. :param ignore_if_not_exists: Don't error/throw if the index does not exist. :param timeout: the time allowed for the operation to be terminated. This is controlled by the client. :raises: QueryIndexNotFoundException :raises: InvalidArgumentsException """ final_args=forward_args(kwargs, *options) final_args['primary'] = True index_name = final_args.pop("index_name", "") self._drop_index(bucket_name, index_name, **final_args)
[docs] def watch_indexes(self, # type: QueryIndexManager bucket_name, # type: str index_names, # type: Iterable[str] *options, # type: WatchQueryIndexOptions **kwargs): """ Watch polls indexes until they are online. :param str bucket_name: name of the bucket. :param Iterable[str] index_names: name(s) of the index(es). :param WatchQueryIndexOptions options: Options for request to watch indexes. :param Any kwargs: Override corresponding valud in options. :raises: QueryIndexNotFoundException :raises: InvalidArgumentsException """ final_args=forward_args(kwargs, *options) BucketManager(self._admin_bucket).n1ql_index_watch(index_names, **final_args)
[docs] def build_deferred_indexes(self, # type: QueryIndexManager bucket_name, # type: str *options, # type: BuildDeferredQueryIndexOptions **kwargs ): """ Build Deferred builds all indexes which are currently in deferred state. :param str bucket_name: name of the bucket. :param BuildDeferredQueryIndexOptions options: Options for building deferred indexes. :param Any kwargs: Override corresponding value in options. :raise: InvalidArgumentsException """ final_args=forward_args(kwargs, *options) return BucketManager._n1ql_index_build_deferred(bucket_name, self._admin_bucket, **final_args)
class IndexType(object): pass @attr.s class QueryIndex(Protocol): """The QueryIndex protocol provides a means of mapping a query index into an object.""" name = attr.ib(validator=io(str)) # type: str is_primary = attr.ib(validator=io(bool)) # type: bool type = attr.ib(validator=io(IndexType), type=IndexType) # type: IndexType state = attr.ib(validator=io(str)) # type: str keyspace = attr.ib(validator=io(str)) # type: str index_key = attr.ib(validator=io(Iterable)) # type: Iterable[str] condition = attr.ib(validator=io(str)) # type: str @classmethod def from_n1qlindex(cls, n1qlindex # type: N1qlIndex ): return cls(n1qlindex.name, bool(n1qlindex.primary), IndexType(), n1qlindex.state, n1qlindex.keyspace, [], n1qlindex.condition or "") class GetAllQueryIndexOptions(OptionBlockTimeOut): pass class CreateQueryIndexOptions(OptionBlockTimeOut): @overload def __init__(self, timeout=None, # type: timedelta ignore_if_exists=None, # type: bool num_replicas=None, # type: int deferred=None, # type: bool condition=None, # type: str ): pass def __init__(self, **kwargs): """ Query Index creation options :param timeout: operation timeout in seconds :param ignore_if_exists: don't throw an exception if index already exists :param num_replicas: number of replicas :param deferred: whether the index creation should be deferred :param condition: 'where' condition for partial index creation """ if 'ignore_if_exists' not in kwargs: kwargs['ignore_if_exists'] = False super(CreateQueryIndexOptions, self).__init__(**kwargs) class CreatePrimaryQueryIndexOptions(CreateQueryIndexOptions): @overload def __init__(self, index_name=None, # type: str timeout=None, # type: timedelta ignore_if_exists=None, # type: bool num_replicas=None, # type: int deferred=None, # type: bool ): pass def __init__(self, **kwargs): super(CreatePrimaryQueryIndexOptions, self).__init__(**kwargs) class DropQueryIndexOptions(OptionBlockTimeOut): @overload def __init__(self, ignore_if_not_exists=None, # type: bool timeout=None # type: timedelta ): pass def __init__(self, **kwargs): super(DropQueryIndexOptions, self).__init__(**kwargs) class DropPrimaryQueryIndexOptions(OptionBlockTimeOut): @overload def __init__(self, index_name=None, # str ignore_if_not_exists=None, # type: bool timeout=None # type: timedelta ): pass def __init__(self, **kwargs): super(DropPrimaryQueryIndexOptions, self).__init__(**kwargs) class WatchQueryIndexOptions(OptionBlock): @overload def __init__(self, watch_primary=None # type: bool ): pass def __init__(self, **kwargs): super(WatchQueryIndexOptions, self).__init__(**kwargs) class BuildDeferredQueryIndexOptions(OptionBlockTimeOut): pass