Source code for couchbase.management.buckets

from couchbase.management.admin import Admin
from ..options import OptionBlockTimeOut, forward_args
from couchbase.management.generic import GenericManager
from typing import *
from couchbase_core import abstractmethod, mk_formstr
from couchbase.exceptions import HTTPException, ErrorMapper, BucketAlreadyExistsException, BucketDoesNotExistException


class BucketManagerErrorHandler(ErrorMapper):
    @staticmethod
    def mapping():
        # type (...)->Mapping[str, CBErrorType]
        return {HTTPException: {'Bucket with given name (already|still) exists': BucketAlreadyExistsException,
                                'Requested resource not found': BucketDoesNotExistException}}

[docs]@BucketManagerErrorHandler.wrap class BucketManager(GenericManager): def __init__(self, # type: BucketManager admin_bucket # type: Admin ): """Bucket Manager :param admin_bucket: Admin bucket """ super(BucketManager, self).__init__(admin_bucket)
[docs] def create_bucket(self, # type: BucketManager settings, # type: CreateBucketSettings *options, # type: CreateBucketOptions **kwargs # type: Any ): """ Creates a new bucket. :param: CreateBucketSettings settings: settings for the bucket. :param: CreateBucketOptions options: options for setting the bucket. :param: Any kwargs: override corresponding values in the options. :raises: BucketAlreadyExistsException :raises: InvalidArgumentsException """ # prune the missing settings... params = ({k:v for k,v in settings.items() if (v is not None)}) # insure flushEnabled is an int params['flushEnabled'] = int(params.get('flushEnabled', None)) # send it return self._admin_bucket.http_request( path='/pools/default/buckets', method='POST', content=mk_formstr(params), content_type='application/x-www-form-urlencoded', **forward_args(kwargs, *options))
[docs] def update_bucket(self, # type: BucketManager settings, # type: BucketSettings *options, # type: UpdateBucketOptions **kwargs # type: Any ): """ Updates a bucket. Every setting must be set to what the user wants it to be after the update. Any settings that are not set to their desired values may be reverted to default values by the server. :param BucketSettings settings: settings for updating the bucket. :param UpdateBucketOptions options: options for updating the bucket. :param Any kwargs: override corresponding values in the options. :raises: InvalidArgumentsException :raises: BucketDoesNotExistException """ # prune the missing settings... params = ({k:v for k,v in settings.items() if (v is not None)}) # insure flushEnabled is an int params['flushEnabled'] = int(params.get('flushEnabled', None)) # send it return self._admin_bucket.http_request( path='/pools/default/buckets/' + settings.name, method='POST', content_type='application/x-www-form-urlencoded', content=mk_formstr(params), **forward_args(kwargs, *options))
[docs] def drop_bucket(self, # type: BucketManager bucket_name, # type: str *options, # type: DropBucketOptions **kwargs # type: Any ): # type: (...) -> None """ Removes a bucket. :param str bucket_name: the name of the bucket. :param DropBucketOptions options: options for dropping the bucket. :param Any kwargs: override corresponding value in the options. :raises: BucketNotFoundException :raises: InvalidArgumentsException """ return self._admin_bucket.http_request( path='/pools/default/buckets/' + bucket_name, method='DELETE', **forward_args(kwargs, *options))
[docs] def get_bucket(self, # type: BucketManager bucket_name, # type: str *options, # type: GetBucketOptions **kwargs # type: Any ): # type: (...) -> BucketSettings """ Gets a bucket's settings. :param str bucket_name: the name of the bucket. :param GetBucketOptions options: options for getting the bucket. :param Any kwargs: override corresponding values in options. :returns: settings for the bucket. Note: the ram quota returned is in bytes not mb so requires x / 1024 twice. Also Note: FlushEnabled is not a setting returned by the server, if flush is enabled then the doFlush endpoint will be listed and should be used to populate the field. :rtype: BucketSettings :raises: BucketNotFoundException :raises: InvalidArgumentsException """ return BucketSettings( **self._admin_bucket.http_request( path='/pools/default/buckets/' + bucket_name, method='GET', **forward_args(kwargs, *options) ).value)
[docs] def get_all_buckets(self, # type: BucketManager *options, # type: GetAllBucketOptions **kwargs # type: Any ): # type: (...) -> Iterable[BucketSettings] """ Gets all bucket settings. Note, # type: the ram quota returned is in bytes not mb so requires x / 1024 twice. :param GetAllBucketOptions options: options for getting all buckets. :param Any kwargs: override corresponding value in options. :returns: An iterable of settings for each bucket. :rtype: Iterable[BucketSettings] """ return list( map(lambda x: BucketSettings(**x), self._admin_bucket.http_request( path='/pools/default/buckets', method='GET', **forward_args(kwargs, *options) ).value))
[docs] def flush_bucket(self, # type: BucketManager bucket_name, # type: str *options, # type: FlushBucketOptions **kwargs # type: Any ): # using the ns_server REST interface """ Flushes a bucket (uses the ns_server REST interface). :param str bucket_name: the name of the bucket. :param FlushBucketOptions options: options for flushing the bucket. :param Any kwargs: override corresponding value in options. :raises: BucketNotFoundException :raises: InvalidArgumentsException :raises: FlushDisabledException """ self._admin_bucket.http_request( path="/pools/default/buckets/{bucket_name}/controller/doFlush".format(bucket_name=bucket_name), method='POST', **forward_args(kwargs, *options))
class BucketSettings(dict): @overload def __init__(self, name=None, flush_enabled=None, ram_quota_mb=None, num_replicas=None, replica_index=None, bucket_type=None, eviction_policy=None, max_ttl=None, compression_mode=None): pass def __init__(self, **raw_info): """BucketSettings provides a means of mapping bucket settings into an object. :param info: :param raw_info: """ # TODO: do we need this? # dict.__init__(self, Admin.bc_defaults) # if created from the bucket info coming back from get_bucket, we need to convert some things here... self.__convert_from_raw(raw_info) # if created by a call from the user, we need to convert the names to the camel-case versions... # we really could do this via a package, but I hate to add a dependency just for this key_tuple = [ ('flushEnabled', 'flush_enabled'), ('numReplicas', 'num_replicas'), ('ramQuotaMB', 'ram_quota_mb'), ('replicaIndex', 'replica_index'), ('bucketType', 'bucket_type'), ('maxTTL', 'max_ttl'), ('compressionMode', 'compression_mode'), ('conflictResolutionType', 'conflict_resolution_type'), ('evictionPolicy', 'eviction_policy'), ('name', 'name') ] self.__pop_if_there(raw_info, key_tuple) def __pop_if_there(self, raw_info, keys): for k in keys: if isinstance(k, tuple): # default is the current value of self[k[0]] self[k[0]] = raw_info.get(k[1], self[k[0]]) else: # you passed in a list of strings, so default is none and the keys are same self[k] = raw_info.get(k, None) def __convert_from_raw(self, raw_info): key_tuple = [ 'name', 'numReplicas', 'replicaIndex', 'replicaIndex', 'bucketType', 'maxTTL', 'compressionMode', 'conflictResolutionType', 'evictionPolicy'] self.__pop_if_there(raw_info, key_tuple) quota = raw_info.get('quota', {}) # convert rawRAM to MB if 'rawRAM' in quota: self['ramQuotaMB'] = quota.get('rawRAM')/1024/1024 else: self['ramQuotaMB'] = None controllers = raw_info.get('controllers', {}) self['flushEnabled'] = ('flush' in controllers) @property def name(self): # type: (...) -> str """Name (string) - The name of the bucket.""" return self.get('name') @property def flush_enabled(self): # type: (...) -> bool """Whether or not flush should be enabled on the bucket. Default to false.""" return self.get('flushEnabled', False) @property def ram_quota_mb(self): # type: (...) -> int """Ram Quota in mb for the bucket. (rawRAM in the server payload)""" return self.get('ramQuotaMB') @property def num_replicas(self): # type: (...) -> int """NumReplicas (int) - The number of replicas for documents.""" return self.get('replicaNumber') @property def replica_index(self): # type: (...) -> bool """ Whether replica indexes should be enabled for the bucket.""" return self.get('replicaIndex') @property def bucket_type(self): # type: (...) -> int """BucketType {couchbase (sent on wire as membase), memcached, ephemeral} The type of the bucket. Default to couchbase.""" return self.get('bucketType') @property def eviction_policy(self): # type: (...) -> int """{fullEviction | valueOnly}. The eviction policy to use.""" return self.get('evictionPolicy') @property def max_ttl(self): # type: (...) -> int """Value for the maxTTL of new documents created without a ttl.""" return self.get('maxTTL') @property def compression_mode(self): # type: (...) -> int """""""{off | passive | active} - The compression mode to use.""" return self.get('compressionMode') @property def as_dict(self): return self class CreateBucketSettings(BucketSettings): @overload def __init__(self, name=None, flush_enabled=None, ram_quota_mb=None, num_replicas=None, replica_index=None, bucket_type=None, ejection_method=None, max_ttl=None, compression_mode=None, conflict_resolution_type=None, bucket_password=None): pass def __init__(self, **kwargs): BucketSettings.__init__(self, **kwargs) @property def conflict_resolution_type(self): return self.get('conflictResolutionType') class CreateBucketOptions(OptionBlockTimeOut): pass class UpdateBucketOptions(OptionBlockTimeOut): pass class DropBucketOptions(OptionBlockTimeOut): pass class GetAllBucketOptions(OptionBlockTimeOut): pass class GetBucketOptions(OptionBlockTimeOut): pass class FlushBucketOptions(OptionBlockTimeOut): pass