Source code for couchbase.management.buckets

from couchbase.management.admin import Admin
from ..options import OptionBlock
from couchbase.management.generic import GenericManager
from typing import *
from couchbase_core import abstractmethod, mk_formstr


[docs]class BucketManager(GenericManager): def __init__(self, # type: BucketManager admin_bucket # type: Admin ): """Bucket Manager :param admin_bucket: Admin bucket """ super(BucketManager, self).__init__(admin_bucket)
[docs] def create_bucket(self, # type: BucketManager settings, # type: CreateBucketSettings *options, # type: CreateBucketOptions **kwargs ): """ Creates a new bucket. :param: CreateBucketSettings settings - settings for the bucket. :raises: BucketAlreadyExistsException (http 400 and content contains "Bucket with given name already exists") :raises: InvalidArgumentsException """ self._admin_bucket.bucket_create(settings.name, bucket_type=settings.bucket_type, replicas=settings.replica_indexes, ram_quota=settings.ram_quota_mb, flush_enabled=settings.flush_enabled)
[docs] def bucket_update(self, current, bucket_password=None, replicas=None, ram_quota=None, flush_enabled=None, **kwargs): """ Update an existing bucket's settings. :param string name: The name of the bucket to update :param dict current: Current state of the bucket. This can be retrieve from :meth:`bucket_info` :param str bucket_password: Change the bucket's password :param int replicas: The number of replicas for the bucket :param int ram_quota: The memory available to the bucket on each node. :param bool flush_enabled: Whether the flush API should be allowed from normal clients :return: A :class:`~.HttpResult` object :raise: :exc:`~.HTTPError` if the request could not be completed .. note:: The default value for all options in this method is ``None``. If a value is set to something else, it will modify the setting. Change the bucket password:: adm.bucket_update('a_bucket', adm.bucket_info('a_bucket'), bucket_password='n3wpassw0rd') Enable the flush API:: adm.bucket_update('a_bucket', adm.bucket_info('a_bucket'), flush_enabled=True) """ params = {} current = current.value name=kwargs.pop('name') # Merge params params['authType'] = current['authType'] if 'saslPassword' in current: params['saslPassword'] = current['saslPassword'] if bucket_password is not None: params['authType'] = 'sasl' params['saslPassword'] = bucket_password params['replicaNumber'] = ( replicas if replicas is not None else current['replicaNumber']) if ram_quota: params['ramQuotaMB'] = ram_quota else: params['ramQuotaMB'] = int(current['quota']['rawRAM'] / 1024 / 1024) if flush_enabled is not None: params['flushEnabled'] = int(flush_enabled) proxy_port=current.get('proxyPort') if proxy_port: params['proxyPort'] = proxy_port params.update({k:v for k,v in kwargs.items() if (v is not None)}) return self._admin_bucket.http_request(path='/pools/default/buckets/' + name, method='POST', content_type='application/x-www-form-urlencoded', content=mk_formstr(params))
[docs] def update_bucket(self, # type: BucketManager settings, # type: IBucketSettings *options # type: UpdateBucketOptions ): """ Updates a bucket. Every setting must be set to what the user wants it to be after the update. Any settings that are not set to their desired values may be reverted to default values by the server. :param IBucketSettings settings: settings for the bucket. :raises: InvalidArgumentsException :raises: BucketDoesNotExistException """ current = self._admin_bucket.bucket_info(settings.name) self.bucket_update(current, **settings.as_dict)
[docs] def drop_bucket(self, # type: BucketManager bucket_name, # type: str *options # type: DropBucketOptions ): # type: (...)->None """ Removes a bucket. :param str bucket_name: the name of the bucket. :raises: BucketNotFoundException :raises: InvalidArgumentsException """ self._admin_bucket.bucket_remove(bucket_name)
[docs] def get_bucket(self, # type: BucketManager bucket_name, # type: str *options # type: GetBucketOptions ): # type: (...)->IBucketSettings """ Gets a bucket's settings. :param str bucket_name: the name of the bucket. :returns: settings for the bucket. Note: the ram quota returned is in bytes not mb so requires x / 1024 twice. Also Note: FlushEnabled is not a setting returned by the server, if flush is enabled then the doFlush endpoint will be listed and should be used to populate the field. :rtype: IBucketSettings :raises: BucketNotFoundException :raises: InvalidArgumentsException """ return BucketSettings(**self._admin_bucket.bucket_info(bucket_name).value)
[docs] def get_all_buckets(self, # type: BucketManager *options # type: GetAllBucketOptions ): # type: (...)->Iterable[IBucketSettings] """ Gets all bucket settings. Note, # type: the ram quota returned is in bytes not mb so requires x / 1024 twice. :returns: An iterable of settings for each bucket. :rtype: Iterable[IBucketSettings] """ return list( map(lambda x: BucketSettings(**x), self._admin_bucket.http_request(path='/pools/default/buckets', method='GET').value))
[docs] def flush_bucket(self, # type: BucketManager bucket_name, # type: str *options # type: FlushBucketOptions ): # using the ns_server REST interface """ Flushes a bucket (uses the ns_server REST interface). :param str bucket_name: the name of the bucket. :raises: BucketNotFoundException :raises: InvalidArgumentsException :raises: FlushDisabledException """ self._admin_bucket.http_request( "/pools/default/buckets/{bucket_name}/controller/doFlush".format(bucket_name=bucket_name), method='POST')
class IBucketSettings(object): @property @abstractmethod def name(self): # type: (...)->str """Name (string) - The name of the bucket.""" pass @property @abstractmethod def flush_enabled(self): # type: (...)->bool """Whether or not flush should be enabled on the bucket. Default to false.""" pass @property @abstractmethod def ram_quota_mb(self): # type: (...)->int """Ram Quota in mb for the bucket. (rawRAM in the server payload)""" pass @property @abstractmethod def num_replicas(self): # type: (...)->int """NumReplicas (int) - The number of replicas for documents.""" pass @property @abstractmethod def replica_indexes(self): # type: (...)->bool """ Whether replica indexes should be enabled for the bucket.""" @property @abstractmethod def bucket_type(self): # type: (...)->int """BucketType - The type of the bucket. Default to couchbase. Sent on wire as {membase, memcached, ephemeral}""" @property @abstractmethod def ejection_method(self): # type: (...)->int """{fullEviction | valueOnly}. The eviction policy to use.""" @property @abstractmethod def max_ttl(self): # type: (...)->int """Value for the maxTTL of new documents created without a ttl.""" pass @property @abstractmethod def compression_mode(self): # type: (...)->int """""""{off | passive | active} - The compression mode to use.""" @property @abstractmethod def as_dict(self): pass class BucketSettings(IBucketSettings, dict): @overload def __init__(self, name=None, flush_enabled=None, ram_quota_mb=None, num_replicas=None, replica_indexes=None, bucket_type=None, ejection_method=None, max_ttl=None, compression_mode=None): pass def __init__(self, **raw_info): """BucketSettings provides a means of mapping bucket settings into an object. :param info: :param raw_info: """ dict.__init__(self, Admin.bc_defaults) self['rawRAM']=raw_info.pop('ram_quota_mb',None) self['maxTTL']=raw_info.pop('max_ttl',None) self.update(**{k: v for k, v in raw_info.items() if (v is not None)}) @property def name(self): # type: (...)->str """Name (string) - The name of the bucket.""" return self.get('name') @property def flush_enabled(self): # type: (...)->bool """Whether or not flush should be enabled on the bucket. Default to false.""" return self.get('flush_enabled', False) @property def ram_quota_mb(self): # type: (...)->int """Ram Quota in mb for the bucket. (rawRAM in the server payload)""" return self.get('rawRAM') @property def num_replicas(self): # type: (...)->int """NumReplicas (int) - The number of replicas for documents.""" return self.get('num_replicas') @property def replica_indexes(self): # type: (...)->bool """ Whether replica indexes should be enabled for the bucket.""" return self.get('replica_indexes') @property def bucket_type(self): # type: (...)->int """BucketType {couchbase (sent on wire as membase), memcached, ephemeral} The type of the bucket. Default to couchbase.""" return self.get('bucket_type') @property def ejection_method(self): # type: (...)->int """{fullEviction | valueOnly}. The eviction policy to use.""" return self.get('ejection_method') @property def max_ttl(self): # type: (...)->int """Value for the maxTTL of new documents created without a ttl.""" return self.get('maxTTL') @property def compression_mode(self): # type: (...)->int """""""{off | passive | active} - The compression mode to use.""" return self.get('compression_mode') @property def as_dict(self): return self class ICreateBucketSettings(IBucketSettings): """CreateBucketSettings is a superset of BucketSettings providing one extra property, ConflictResolutionType: The reasoning for this is that on Update ConflictResolutionType cannot be present in the JSON payload at all. """ @property @abstractmethod def conflict_resolution_type(self): # type: (...)->int """{Timestamp (sent as lww) | SequenceNumber (sent as seqno)}. The conflict resolution type to use.""" pass class CreateBucketSettings(ICreateBucketSettings, BucketSettings): @overload def __init__(self, name=None, flush_enabled=None, ram_quota_mb=None, num_replicas=None, replica_indexes=None, bucket_type=None, ejection_method=None, max_ttl=None, compression_mode=None, conflict_resolution_type=None, bucket_password=None): pass def __init__(self, **kwargs): BucketSettings.__init__(self, **kwargs) @property def conflict_resolution_type(self): return self.get('conflict_resolution_type') class CreateBucketOptions(OptionBlock): pass class UpdateBucketOptions(OptionBlock): pass class DropBucketOptions(OptionBlock): pass class GetAllBucketOptions(OptionBlock): pass class GetBucketOptions(OptionBlock): pass class FlushBucketOptions(OptionBlock): pass