Source code for couchbase.management.collections

from couchbase.options import OptionBlockTimeOut, forward_args
from couchbase.management.admin import Admin
from typing import *
from .generic import GenericManager
from couchbase_core import mk_formstr
from couchbase_core.supportability import deprecated

from couchbase.exceptions import (ErrorMapper, NotSupportedWrapper,
                                  HTTPException, ScopeNotFoundException,
                                  ScopeAlreadyExistsException, CollectionNotFoundException,
                                  CollectionAlreadyExistsException, QuotaLimitedException,
                                  RateLimitedException)
from datetime import timedelta


class CollectionsErrorHandler(ErrorMapper):
    @staticmethod
    def mapping():
        # type (...)->Mapping[str, CBErrorType]
        return {HTTPException: {'.*Scope with.*name.*already exists': ScopeAlreadyExistsException,
                                '.*Scope with.*name.*not found': ScopeNotFoundException,
                                '.*Collection with.*name.*not found': CollectionNotFoundException,
                                '.*Collection with.*name.*already exists': CollectionAlreadyExistsException,
                                '.*collection_not_found.*': CollectionNotFoundException,
                                '.*scope_not_found.*': ScopeNotFoundException,
                                '.*Maximum number of collections has been reached for scope.*': QuotaLimitedException,
                                '.*Limit\(s\) exceeded\s+\[.*\].*': RateLimitedException}}


[docs]class CollectionManager(GenericManager): def __init__(self, # type: CollectionManager admin_bucket, # type: Admin bucket_name # type: str ): super(CollectionManager, self).__init__(admin_bucket) self.base_path = 'pools/default/buckets/{}/scopes'.format(bucket_name)
[docs] @deprecated(instead="get_all_scopes") def get_scope(self, # type: CollectionManager scope_name, # type: str *options, # type: GetScopeOptions **kwargs # type: Any ): # type: (...) -> ScopeSpec """ Gets a scope. This will fetch a manifest and then pull the scope out of it. :param str scope_name: name of the scope. :param GetScopeOptions options: scope options :param GetScopeOptions kwargs: overrides for scope options :return: a ScopeSpec containing information about the specified scope. :raises: ScopeNotFoundException """ try: return next(s for s in self.get_all_scopes( *options, **kwargs) if s.name == scope_name) except StopIteration: raise ScopeNotFoundException( "no scope with name {}".format(scope_name))
@NotSupportedWrapper.a_400_or_404_means_not_supported def get_all_scopes(self, # type: CollectionManager *options, # type: GetAllScopesOptions **kwargs # type: Any ): # type: (...) -> Iterable[ScopeSpec] """ Gets all scopes. This will fetch a manifest and then pull the scopes out of it. :param GetAllScopesOptions options: (currently just timeout). :param kwargs: keyword version of options :return: An Iterable[ScopeSpec] containing all scopes in the associated bucket. """ kwargs.update({'path': self.base_path, 'method': 'GET'}) response = self._admin_bucket.http_request( **forward_args(kwargs, *options)) # now lets turn the response into a list of ScopeSpec... # the response looks like: # {'uid': '0', 'scopes': [{'name': '_default', 'uid': '0', 'collections': [{'name': '_default', 'uid': '0'}]}]} retval = list() for s in response.value['scopes']: scope = ScopeSpec(s['name'], list()) for c in s['collections']: scope.collections.append(CollectionSpec(c['name'], scope.name)) retval.append(scope) return retval
[docs] @CollectionsErrorHandler.mgmt_exc_wrap def create_collection(self, # type: CollectionManager collection, # type: CollectionSpec *options, # type: CreateCollectionOptions **kwargs # type: Any ): """ Creates a new collection. :param CollectionSpec collection: specification of the collection. :param CreateCollectionOptions options: options (currently just timeout). :param kwargs: keyword version of 'options' :return: :raises: InvalidArgumentsException :raises: CollectionAlreadyExistsException :raises: ScopeNotFoundException """ params = { 'name': collection.name } if collection.max_ttl: params['maxTTL'] = int(collection.max_ttl.total_seconds()) form = mk_formstr(params) kwargs.update({'path': '{}/{}/collections'.format(self.base_path, collection.scope_name), 'method': 'POST', 'content_type': 'application/x-www-form-urlencoded', 'content': form}) return self._admin_bucket.http_request( **forward_args(kwargs, *options))
[docs] @CollectionsErrorHandler.mgmt_exc_wrap def drop_collection(self, # type: CollectionManager collection, # type: CollectionSpec *options, # type: DropCollectionOptions **kwargs # type: Any ): # type: (...) -> None """ Removes a collection. :param CollectionSpec collection: namspece of the collection. :param DropCollectionOptions options: (currently just timeout). :param kwargs: keyword version of `options` :raises: CollectionNotFoundException """ kwargs.update({'path': '{}/{}/collections/{}'.format(self.base_path, collection.scope_name, collection.name), 'method': 'DELETE'}) self._admin_bucket.http_request(**forward_args(kwargs, *options))
[docs] @CollectionsErrorHandler.mgmt_exc_wrap def create_scope(self, # type: CollectionManager scope_name, # type: str *options, # type: CreateScopeOptions **kwargs # type: Any ): # type: (...) -> None """ Creates a new scope. :param str scope_name: name of the scope. :param CreateScopeOptions options: options (currently just timeout). :param kwargs: keyword version of `options` :return: :raises: InvalidArgumentsException Any exceptions raised by the underlying platform Uri POST http://localhost:8091/pools/default/buckets/<bucket>/collections -d name=<scope_name> """ params = { 'name': scope_name } form = mk_formstr(params) kwargs.update({'path': self.base_path, 'method': 'POST', 'content_type': 'application/x-www-form-urlencoded', 'content': form}) self._admin_bucket.http_request(**forward_args(kwargs, *options))
[docs] @CollectionsErrorHandler.mgmt_exc_wrap def drop_scope(self, # type: CollectionManager scope_name, # type: str *options, # type: DropScopeOptions **kwargs # type: Any ): """ Removes a scope. :param str scope_name: name of the scope :param DropScopeOptions options: (currently just timeout) :param kwargs: keyword version of `options` :raises: ScopeNotFoundException """ kwargs.update({'path': '{}/{}'.format(self.base_path, scope_name), 'method': 'DELETE'}) self._admin_bucket.http_request(**forward_args(kwargs, *options))
class CollectionSpec(object): def __init__(self, collection_name, # type: str scope_name='_default', # type: str max_ttl=None # type: timedelta ): self._name, self._scope_name = collection_name, scope_name self._max_ttl = max_ttl @property def name(self): # type: (...) -> str return self._name @property def scope_name(self): # type: (...) -> str return self._scope_name @property def max_ttl(self): # type: (...) -> timedelta return self._max_ttl class ScopeSpec(object): def __init__(self, name, # type : str collections, # type: Iterable[CollectionSpec] ): self._name, self._collections = name, collections @property def name(self): # type: (...) -> str return self._name @property def collections(self): # type: (...) -> Iterable[CollectionSpec] return self._collections class GetAllScopesOptions(OptionBlockTimeOut): pass class GetScopeOptions(GetAllScopesOptions): pass class CreateCollectionOptions(OptionBlockTimeOut): pass class DropCollectionOptions(OptionBlockTimeOut): pass class CreateScopeOptions(OptionBlockTimeOut): pass class DropScopeOptions(OptionBlockTimeOut): pass