Source code for couchbase.collection

from couchbase_core.supportability import uncommitted
from couchbase_core import abstractmethod, JSON

from boltons.funcutils import wraps
from mypy_extensions import VarArg, KwArg, Arg

from .subdocument import LookupInSpec, MutateInSpec, MutateInOptions, \
    gen_projection_spec
from .result import GetResult, get_result_wrapper, SDK2Result, ResultPrecursor, LookupInResult, MutateInResult, \
    MutationResult, _wrap_in_mutation_result, SDK2AsyncResult, get_mutation_result, get_multi_mutation_result
from .options import forward_args, Seconds, OptionBlockTimeOut, OptionBlockDeriv, ConstrainedInt, SignedInt64, AcceptableInts
from .options import OptionBlock, AcceptableInts
from .durability import ReplicateTo, PersistTo, ClientDurableOption, ServerDurableOption
from couchbase_core._libcouchbase import Collection as _Base
import couchbase.exceptions
from couchbase_core.client import Client as CoreClient
import copy

from typing import *
from couchbase_core.durability import Durability
from couchbase_core._pyport import with_metaclass
from couchbase_core.asynchronous.bucket import AsyncClientFactory


[docs]class DeltaValue(ConstrainedInt):
[docs] def __init__(self, value # type: AcceptableInts ): # type: (...)->None """ A non-negative integer between 0 and +0x7FFFFFFFFFFFFFFF inclusive. Used as an argument for :meth:`Collection.increment` and :meth:`Collection.decrement` :param couchbase.options.AcceptableInts value: the value to initialise this with. :raise: :exc:`~couchbase.exceptions.ArgumentError` if not in range """ super(DeltaValue,self).__init__(value)
@classmethod def max(cls): return 0x7FFFFFFFFFFFFFFF @classmethod def min(cls): return 0
class ReplaceOptions(OptionBlockTimeOut, ClientDurableOption, ServerDurableOption): def __init__(self, *args, **kwargs): super(ReplaceOptions, self).__init__(*args, **kwargs) def cas(self, # type: ReplaceOptions cas # type: int ): # type: (...)->ReplaceOptions self.__setitem__('cas', cas) return self class AppendOptions(OptionBlock): def __init__(self, *args, **kwargs): super(AppendOptions, self).__init__(*args, **kwargs) class RemoveOptionsBase(OptionBlock): def __init__(self, *args, **kwargs): super(RemoveOptionsBase, self).__init__(*args, **kwargs) class RemoveOptions(RemoveOptionsBase, ClientDurableOption, ServerDurableOption): ServerDurable = RemoveOptionsBase ClientDurable = RemoveOptionsBase def __init__(self, *args, **kwargs): super(RemoveOptions, self).__init__(*args, **kwargs) class PrependOptions(OptionBlock): def __init__(self, *args, **kwargs): super(PrependOptions, self).__init__(*args, **kwargs) class UnlockOptions(OptionBlock): def __init__(self, *args, **kwargs): super(UnlockOptions, self).__init__(*args, **kwargs) class CounterOptions(OptionBlock, ServerDurableOption): def __init__(self, *args, **kwargs): super(CounterOptions, self).__init__(*args, **kwargs) class CollectionOptions(OptionBlock): def __init__(self, *args, **kwargs): super(CollectionOptions, self).__init__(*args, **kwargs) class GetAndTouchOptions(OptionBlock): def __init__(self, *args, **kwargs): super(GetAndTouchOptions, self).__init__(*args, **kwargs) class LockOptions(OptionBlock): pass class GetOptionsProject(OptionBlock): def __init__(self, parent, *args): self['project'] = args super(GetOptionsProject, self).__init__(**parent) class GetOptionsNonProject(OptionBlock): def __init__(self, parent): super(GetOptionsNonProject, self).__init__(**parent) class GetOptions(OptionBlock): def __init__(self, *args, **kwargs): super(GetOptions, self).__init__(*args, **kwargs) def project(self, *args): # type: (...)->GetOptionsProject return GetOptionsProject(self, *args) def timeout(self, duration # type: Seconds ): # type: (...)->GetOptionsNonProject self['timeout'] = duration.__int__() return GetOptionsNonProject(self) def __copy__(self): return GetOptionsNonProject(**self) class GetAndLockOptions(GetOptions, LockOptions): pass class InsertOptions(OptionBlock, ServerDurableOption, ClientDurableOption): pass class GetFromReplicaOptions(OptionBlock): pass T = TypeVar('T', bound='CBCollection') R = TypeVar("R") RawCollectionMethodDefault = Callable[ [Arg('CBCollection', 'self'), Arg(str, 'key'), VarArg(OptionBlockDeriv), KwArg(Any)], R] RawCollectionMethodInt = Callable[ [Arg('CBCollection', 'self'), Arg(str, 'key'), int, VarArg(OptionBlockDeriv), KwArg(Any)], R] RawCollectionMethod = Union[RawCollectionMethodDefault, RawCollectionMethodInt] RawCollectionMethodSpecial = TypeVar('RawCollectionMethodSpecial', bound=RawCollectionMethod) def _get_result_and_inject(func # type: RawCollectionMethod ): # type: (...) ->RawCollectionMethod result = _inject_scope_and_collection(get_result_wrapper(func)) result.__doc__ = func.__doc__ result.__name__ = func.__name__ return result def _mutate_result_and_inject(func # type: RawCollectionMethod ): # type: (...) ->RawCollectionMethod result = _inject_scope_and_collection(_wrap_in_mutation_result(func)) result.__doc__ = func.__doc__ result.__name__ = func.__name__ return result def _inject_scope_and_collection(func # type: RawCollectionMethodSpecial ): # type: (...) -> RawCollectionMethod return func CoreBucketOpRead = TypeVar("CoreBucketOpRead", Callable[[Any], SDK2Result], Callable[[Any], GetResult]) def _wrap_get_result(func # type: CoreBucketOpRead ): # type: (...) -> CoreBucketOpRead @wraps(func) def wrapped(self, # type: CBCollection *args, # type: Any **kwargs # type: Any ): # type: (...)->Any return _inject_scope_and_collection(get_result_wrapper(func))(self,*args,**kwargs) return wrapped class BinaryCollection(object): pass class TouchOptions(OptionBlock): pass class IExistsResult(object): @abstractmethod def exists(self): pass class LookupInOptions(OptionBlock): pass CoreBucketOp = TypeVar("CoreBucketOp", Callable[[Any], SDK2Result], Callable[[Any], MutationResult]) def _wrap_multi_mutation_result(wrapped # type: CoreBucketOp ): # type: (...)->CoreBucketOp @wraps(wrapped) def wrapper(target, keys, *options, **kwargs ): return get_multi_mutation_result(target, wrapped, keys, *options, **kwargs) return _inject_scope_and_collection(wrapper) class CBCollection(CoreClient): def __init__(self, *args, **kwargs ): # type: (...)->None """ Couchbase collection. Should only be invoked by internal API, e.g. by :meth:`couchbase.collection.scope.Scope.collection` or :meth:`couchbase.bucket.Bucket.default_collection`. Args as for CoreClient, plus: :param couchbase.collections.Scope parent: parent scope :param str name: name of collection :param CollectionOptions options: miscellaneous options """ name = kwargs.pop('name', None) parent = kwargs.pop('parent', None) args = list(args) connstr = kwargs.pop('connection_string', kwargs.pop('connstr', None)) connstr = connstr or args.pop(0) final_args = [connstr] + args if parent: kwargs['scope']=parent.name kwargs['collection']=name super(CBCollection, self).__init__(*final_args, **kwargs) def _wrap_dsop(self, sdres, has_value=False, **kwargs): return getattr(super(Collection, self)._wrap_dsop(sdres, has_value), 'value') @classmethod def cast(cls, parent, # type: Scope name, # type Optional[str] *options # type: CollectionOptions ): # type: (...)->CBCollection coll_args = copy.deepcopy(parent.bucket._bucket_args) coll_args.update(name=name, parent=parent) result = parent.bucket._corebucket_class(parent.bucket._connstr, **parent.bucket._bucket_args) return result @property def bucket(self): # type: (...) -> CoreClient return super(CBCollection,self) MAX_GET_OPS = 16 def _get_generic(self, key, kwargs, options): options = forward_args(kwargs, *options) options.pop('key', None) spec = options.pop('spec', []) project = options.pop('project', None) if project: if len(project) <= CBCollection.MAX_GET_OPS: spec = gen_projection_spec(project) else: raise couchbase.exceptions.ArgumentError( "Project only accepts {} operations or less".format(CBCollection.MAX_GET_OPS)) if not project: x = super(CBCollection,self).get(key, **options) else: x = super(CBCollection,self).lookup_in(key, spec, **options) return ResultPrecursor(x, options) @overload def get(self, key, # type:str *options # type: GetOptions ): # type: (...) -> GetResult pass @overload def get(self, key, # type:str project=None, # type: Iterable[str] expiration=None, # type: Seconds quiet=None, # type: bool replica=False, # type: bool no_format=False # type: bool ): # type: (...) -> GetResult pass @_get_result_and_inject def get(self, key, # type: str *options, # type: GetOptions **kwargs # type: Any ): # type: (...) -> ResultPrecursor """Obtain an object stored in Couchbase by given key. :param string key: The key to fetch. The type of key is the same as mentioned in :meth:`upsert` :param couchbase.options.Seconds expiration: If specified, indicates that the key's expiration time should be *modified* when retrieving the value. :param boolean quiet: causes `get` to return None instead of raising an exception when the key is not found. It defaults to the value set by :attr:`~quiet` on the instance. In `quiet` mode, the error may still be obtained by inspecting the :attr:`~.Result.rc` attribute of the :class:`.Result` object, or checking :attr:`.Result.success`. Note that the default value is `None`, which means to use the :attr:`quiet`. If it is a boolean (i.e. `True` or `False`) it will override the `couchbase_core.client.Client`-level :attr:`quiet` attribute. :param bool replica: Whether to fetch this key from a replica rather than querying the master server. This is primarily useful when operations with the master fail (possibly due to a configuration change). It should normally be used in an exception handler like so Using the ``replica`` option:: try: res = c.get("key", quiet=True) # suppress not-found errors catch CouchbaseError: res = c.get("key", replica=True, quiet=True) :param bool no_format: If set to ``True``, then the value will always be delivered in the :class:`~couchbase.result.GetResult` object as being of :data:`~couchbase_core.FMT_BYTES`. This is a item-local equivalent of using the :attr:`data_passthrough` option :raise: :exc:`.NotFoundError` if the key does not exist :raise: :exc:`.CouchbaseNetworkError` :raise: :exc:`.ValueFormatError` if the value cannot be deserialized with chosen decoder, e.g. if you try to retreive an object stored with an unrecognized format :return: A :class:`couchbase.result.GetResult` object Simple get:: value = cb.get('key').content_as[str] Inspect CAS value:: rv = cb.get("key") value, cas = rv.content, rv.cas Update the expiration time:: rv = cb.get("key", expiration=Seconds(10)) # Expires in ten seconds """ return self._get_generic(key, kwargs, options) @overload def get_and_touch(self, id, # type: str expiration, # type: int *options # type: GetAndTouchOptions ): # type: (...)->GetResult pass @_get_result_and_inject def get_and_touch(self, id, # type: str expiration, # type: int *options, # type: GetAndTouchOptions **kwargs # type: Any ): # type: (...)->Tuple[SDK2Result, Tuple[Tuple[GetAndTouchOptions]]] kwargs_final = forward_args(kwargs, *options) if 'durability' in set(kwargs.keys()).union(options[0][0].keys()): raise couchbase.exceptions.ReplicaNotAvailableException() return self._get_generic(id, kwargs, options) @_get_result_and_inject def get_and_lock(self, id, # type: str expiration, # type: int *options, # type: GetAndLockOptions **kwargs ): # type: (...)->GetResult final_options=forward_args(kwargs, *options) x = _Base.get(self, id, expiration, **final_options) _Base.lock(self, id, options) return ResultPrecursor(x, options) @_get_result_and_inject def get_from_replica(self, id, # type: str replica_index, # type: int *options, # type: GetFromReplicaOptions **kwargs # type: Any ): # type: (...)->ResultPrecursor final_options = forward_args(kwargs, *options) return ResultPrecursor(super(CBCollection,self).rget(id, replica_index, **final_options), final_options) @_inject_scope_and_collection def get_multi(self, # type: CBCollection keys, # type: Iterable[str] *options, # type: GetOptions **kwargs ): # type: (...)->Dict[str,GetResult] """ Get multiple keys from the collection :param keys: list of keys to get :type Iterable[str] :return: a dictionary of :class:`~.GetResult` objects by key :rtype: dict """ raw_result = super(CBCollection,self).get_multi(keys, **forward_args(kwargs, *options)) return {k: SDK2AsyncResult(v) for k, v in raw_result.items()} @overload def upsert_multi(self, # type: CBCollection keys, # type: Mapping[str,Any] ttl=0, # type: int format=None, # type: int persist_to=0, # type: int replicate_to=0, # type: int durability_level=Durability.NONE # type: Durability ): pass @_inject_scope_and_collection def upsert_multi(self, # type: CBCollection keys, # type: Dict[str,JSON] *options, # type: GetOptions **kwargs ): # type: (...)->Dict[str,MutationResult] """ Write multiple items to the cluster. Multi version of :meth:`upsert` :param dict keys: A dictionary of keys to set. The keys are the keys as they should be on the server, and the values are the values for the keys to be stored. `keys` may also be a :class:`~.ItemCollection`. If using a dictionary variant for item collections, an additional `ignore_cas` parameter may be supplied with a boolean value. If not specified, the operation will fail if the CAS value on the server does not match the one specified in the `Item`'s `cas` field. :param int ttl: If specified, sets the expiration value for all keys :param int format: If specified, this is the conversion format which will be used for _all_ the keys. :param int persist_to: Durability constraint for persistence. Note that it is more efficient to use :meth:`endure_multi` on the returned :class:`~couchbase_v2.result.MultiResult` than using these parameters for a high volume of keys. Using these parameters however does save on latency as the constraint checking for each item is performed as soon as it is successfully stored. :param int replicate_to: Durability constraints for replication. See notes on the `persist_to` parameter for usage. :param Durability durability_level: Sync replication durability level. You should either use this or the old-style durability params above, but not both. :return: A :class:`~.MultiResult` object, which is a `dict`-like object The multi methods are more than just a convenience, they also save on network performance by batch-scheduling operations, reducing latencies. This is especially noticeable on smaller value sizes. .. seealso:: :meth:`upsert` """ return get_multi_mutation_result(self, CoreClient.upsert_multi, keys, *options, **kwargs) @_inject_scope_and_collection def insert_multi(self, # type: CBCollection keys, # type: Dict[str,JSON] *options, # type: GetOptions **kwargs ): # type: (...)->Dict[str, MutationResult] """ Insert multiple items into the collection. :param dict keys: dictionary of items to insert, by key :return: a dictionary of :class:`~.MutationResult` objects by key :rtype: dict .. seealso:: :meth:`upsert_multi` - for other optional arguments """ return get_multi_mutation_result(self, CoreClient.insert_multi, keys, *options, **kwargs) @_inject_scope_and_collection def remove_multi(self, # type: CBCollection keys, # type: Iterable[str] *options, # type: GetOptions **kwargs ): # type: (...)->Dict[str, MutationResult] """ Remove multiple items from the collection. :param list keys: list of items to remove, by key :return: a dictionary of :class:`~.MutationResult` objects by key :rtype: dict .. seealso:: :meth:`upsert_multi` - for other optional arguments """ return get_multi_mutation_result(self, CoreClient.remove_multi, keys, *options, **kwargs) replace_multi = _wrap_multi_mutation_result(CoreClient.replace_multi) touch_multi = _wrap_multi_mutation_result(CoreClient.touch_multi) lock_multi = _wrap_multi_mutation_result(CoreClient.lock_multi) unlock_multi = _wrap_multi_mutation_result(CoreClient.unlock_multi) append_multi = _wrap_multi_mutation_result(CoreClient.unlock_multi) prepend_multi = _wrap_multi_mutation_result(CoreClient.prepend_multi) counter_multi = _wrap_multi_mutation_result(CoreClient.counter_multi) def touch(self, id, # type: str *options, # type: TouchOptions **kwargs): # type: (...)->MutationResult """Update a key's expiration time :param string key: The key whose expiration time should be modified :param int timeout: The new expiration time. If the expiration time is `0` then the key never expires (and any existing expiration is removed) :param Durability durability_level: Sync replication durability level. :return: :class:`.OperationResult` Update the expiration time of a key :: cb.upsert("key", expiration=Seconds(100)) # expires in 100 seconds cb.touch("key", expiration=Seconds(0)) # key should never expire now :raise: The same things that :meth:`get` does .. seealso:: :meth:`get` - which can be used to get *and* update the expiration """ return _Base.touch(self, id, **forward_args(kwargs, *options)) @_wrap_in_mutation_result def unlock(self, id, # type: str *options # type: UnlockOptions ): # type: (...)->MutationResult """Unlock a Locked Key in Couchbase. This unlocks an item previously locked by :meth:`lock` :param key: The key to unlock :param cas: The cas returned from :meth:`lock`'s :class:`.MutationResult` object. See :meth:`lock` for an example. :raise: :exc:`.TemporaryFailError` if the CAS supplied does not match the CAS on the server (possibly because it was unlocked by previous call). .. seealso:: :meth:`lock` """ return _Base.unlock(self, id, **forward_args({}, *options)) def lock(self, # type: CBCollection key, # type: str *options, # type: LockOptions **kwargs # type: Any ): """Lock and retrieve a key-value entry in Couchbase. :param key: A string which is the key to lock. :param ttl: a TTL for which the lock should be valid. While the lock is active, attempts to access the key (via other :meth:`lock`, :meth:`upsert` or other mutation calls) will fail with an :exc:`.KeyExistsError`. Note that the value for this option is limited by the maximum allowable lock time determined by the server (currently, this is 30 seconds). If passed a higher value, the server will silently lower this to its maximum limit. This function otherwise functions similarly to :meth:`get`; specifically, it will return the value upon success. Note the :attr:`~.MutationResult.cas` value from the :class:`.MutationResult` object. This will be needed to :meth:`unlock` the key. Note the lock will also be implicitly released if modified by one of the :meth:`upsert` family of functions when the valid CAS is supplied :raise: :exc:`.TemporaryFailError` if the key is already locked. :raise: See :meth:`get` for possible exceptions Lock a key :: rv = cb.lock("locked_key", ttl=5) # This key is now locked for the next 5 seconds. # attempts to access this key will fail until the lock # is released. # do important stuff... cb.unlock("locked_key", rv.cas) Lock a key, implicitly unlocking with :meth:`upsert` with CAS :: rv = self.cb.lock("locked_key", ttl=5) new_value = rv.value.upper() cb.upsert("locked_key", new_value, rv.cas) Poll and Lock :: rv = None begin_time = time.time() while time.time() - begin_time < 15: try: rv = cb.lock("key", ttl=10) break except TemporaryFailError: print("Key is currently locked.. waiting") time.sleep(1) if not rv: raise Exception("Waited too long..") # Do stuff.. cb.unlock("key", rv.cas) .. seealso:: :meth:`get`, :meth:`unlock` """ final_options = forward_args(kwargs, *options) return _Base.lock(self, key, **final_options) def exists(self, # type: CBCollection id, # type: str timeout=None, # type: Seconds ): # type: (...)->IExistsResult """ Any exceptions raised by the underlying platform :param id: the id of the document :type: str :param timeout: the time allowed for the operation to be terminated. This is controlled by the client. :type: str :return: An IExistsResult object with a boolean value indicating the presence of the document. :raise: Any exceptions raised by the underlying platform """ class UpsertOptions(OptionBlock, ClientDurableOption, ServerDurableOption): def __init__(self, *args, **kwargs): super(CBCollection.UpsertOptions, self).__init(*args, **kwargs) @overload def upsert(self, key, value, *options # type: UpsertOptions ): pass @overload def upsert(self, id, # type: str value, # type: Any cas=0, # type: int expiration=Seconds(0), # type: Seconds format=None, persist_to=PersistTo.NONE, # type: PersistTo.Value replicate_to=ReplicateTo.NONE, # type: ReplicateTo.Value durability_level=Durability.NONE # type: Durability ): # type: (...) -> MutationResult pass @_mutate_result_and_inject def upsert(self, id, # type: str value, # type: Any *options, # type: UpsertOptions **kwargs # type: Any ): # type: (...)->MutationResult """Unconditionally store the object in Couchbase. :param key: The key to set the value with. By default, the key must be either a :class:`bytes` or :class:`str` object encodable as UTF-8. If a custom `transcoder` class is used (see :meth:`~__init__`), then the key object is passed directly to the transcoder, which may serialize it how it wishes. :type key: string or bytes :param value: The value to set for the key. This should be a native Python value which will be transparently serialized to JSON by the library. Do not pass already-serialized JSON as the value or it will be serialized again. If you are using a different `format` setting (see `format` parameter), and/or a custom transcoder then value for this argument may need to conform to different criteria. :param int cas: The _CAS_ value to use. If supplied, the value will only be stored if it already exists with the supplied CAS :param expiration: If specified, the key will expire after this many seconds :param int format: If specified, indicates the `format` to use when encoding the value. If none is specified, it will use the `default_format` For more info see :attr:`~.default_format` :param int persist_to: Perform durability checking on this many nodes nodes for persistence to disk. See :meth:`endure` for more information :param int replicate_to: Perform durability checking on this many replicas for presence in memory. See :meth:`endure` for more information. :param Durability durability_level: Durability level :raise: :exc:`.ArgumentError` if an argument is supplied that is not applicable in this context. For example setting the CAS as a string. :raise: :exc`.CouchbaseNetworkError` :raise: :exc:`.KeyExistsError` if the key already exists on the server with a different CAS value. :raise: :exc:`.ValueFormatError` if the value cannot be serialized with chosen encoder, e.g. if you try to store a dictionary in plain mode. :return: :class:`~.Result`. Simple set:: cb.upsert('key', 'value') Force JSON document format for value:: cb.upsert('foo', {'bar': 'baz'}, format=couchbase_core.FMT_JSON) Insert JSON from a string:: JSONstr = '{"key1": "value1", "key2": 123}' JSONobj = json.loads(JSONstr) cb.upsert("documentID", JSONobj, format=couchbase_core.FMT_JSON) Force UTF8 document format for value:: cb.upsert('foo', "<xml></xml>", format=couchbase_core.FMT_UTF8) Perform optimistic locking by specifying last known CAS version:: cb.upsert('foo', 'bar', cas=8835713818674332672) Simple set with durability:: cb.upsert('key', 'value', durability_level=Durability.MAJORITY_AND_PERSIST_ON_MASTER) """ final_options = forward_args(kwargs, *options) return ResultPrecursor(self.bucket.upsert(id, value, **final_options), final_options) def insert(self, id, # type: str value, # type: Any *options # type: InsertOptions ): # type: (...)->MutationResult pass @overload def insert(self, id, # type: str value, # type: Any expiration=Seconds(0), # type: Seconds format=None, # type: str persist_to=PersistTo.NONE, # type: PersistTo.Value replicate_to=ReplicateTo.NONE, # type: ReplicateTo.Value durability_level=Durability.NONE # type: Durability ): pass @_mutate_result_and_inject def insert(self, key, value, *options, **kwargs): # type: (...)->ResultPrecursor """Store an object in Couchbase unless it already exists. Follows the same conventions as :meth:`upsert` but the value is stored only if it does not exist already. Conversely, the value is not stored if the key already exists. Notably missing from this method is the `cas` parameter, this is because `insert` will only succeed if a key does not already exist on the server (and thus can have no CAS) :raise: :exc:`.KeyExistsError` if the key already exists .. seealso:: :meth:`upsert` """ final_options = forward_args(kwargs, *options) return ResultPrecursor(_Base.insert(self, key, value, **final_options), final_options) @overload def replace(self, id, # type: str value, # type: Any cas=0, # type: int expiration=None, # type: Seconds format=None, # type: bool persist_to=PersistTo.NONE, # type: PersistTo.Value replicate_to=ReplicateTo.NONE, # type: ReplicateTo.Value durability_level=Durability.NONE # type: Durability ): # type: (...)->MutationResult pass @overload def replace(self, id, # type: str value, # type: Any options, # type: ReplaceOptions ): # type: (...)->MutationResult pass @_mutate_result_and_inject def replace(self, id, # type: str value, # type: Any *options, **kwargs ): # type: (...)->MutationResult """Store an object in Couchbase only if it already exists. Follows the same conventions as :meth:`upsert`, but the value is stored only if a previous value already exists. :raise: :exc:`.NotFoundError` if the key does not exist .. seealso:: :meth:`upsert` """ final_options = forward_args(kwargs, *options) return ResultPrecursor(_Base.replace(self, id, value, **final_options), final_options) @overload def remove(self, # type: CBCollection id, # type: str cas=0, # type: int persist_to=PersistTo.NONE, # type: PersistTo.Value replicate_to=ReplicateTo.NONE, # type: ReplicateTo.Value durability_level=Durability.NONE # type: Durability ): # type: (...)->MutationResult pass @overload def remove(self, # type: CBCollection id, # type: str *options # type: RemoveOptions ): # type: (...)->MutationResult pass @_mutate_result_and_inject def remove(self, # type: CBCollection id, # type: str *options, # type: RemoveOptions **kwargs ): # type: (...)->MutationResult """Remove the key-value entry for a given key in Couchbase. :param key: A string which is the key to remove. The format and type of the key follows the same conventions as in :meth:`upsert` :type key: string, dict, or tuple/list :param int cas: The CAS to use for the removal operation. If specified, the key will only be removed from the server if it has the same CAS as specified. This is useful to remove a key only if its value has not been changed from the version currently visible to the client. If the CAS on the server does not match the one specified, an exception is thrown. :param boolean quiet: Follows the same semantics as `quiet` in :meth:`get` :param int persist_to: If set, wait for the item to be removed from the storage of at least these many nodes :param int replicate_to: If set, wait for the item to be removed from the cache of at least these many nodes (excluding the master) :raise: :exc:`.NotFoundError` if the key does not exist. :raise: :exc:`.KeyExistsError` if a CAS was specified, but the CAS on the server had changed :return: A :class:`~.Result` object. Simple remove:: ok = cb.remove("key").success Don't complain if key does not exist:: ok = cb.remove("key", quiet=True) Only remove if CAS matches our version:: rv = cb.get("key") cb.remove("key", cas=rv.cas) """ final_options = forward_args(kwargs, *options) return ResultPrecursor(self.bucket.remove(id, **final_options), final_options) @_inject_scope_and_collection def lookup_in(self, id, # type: str spec, # type: SubdocSpec *options, # type: LookupInOptions **kwargs ): # type: (...)->LookupInResult """Atomically retrieve one or more paths from a document. :param id: The key of the document to lookup :param spec: An iterable sequence of specs (see :mod:`.couchbase_core.subdocument`) :return: A :class:`.couchbase.LookupInResult` object. This object contains the results and any errors of the operation. Example:: import couchbase_core.subdocument as SD rv = cb.lookup_in('user', SD.get('email'), SD.get('name'), SD.exists('friends.therock')) email = rv[0] name = rv[1] friend_exists = rv.exists(2) .. seealso:: :meth:`retrieve_in` which acts as a convenience wrapper """ final_options=forward_args(kwargs, *options) return LookupInResult(self.bucket.lookup_in(id, spec, **final_options ),final_options) @overload def mutate_in(self, id, # type: str spec, # type: MutateInSpec *options # type: MutateInOptions ): # type: (...)->MutationResult pass @overload def mutate_in(self, id, # type: str spec, # type: MutateInSpec create_doc=False, # type: bool insert_doc=False, # type: bool upsert_doc=False, # type: bool durability_level=Durability.NONE # type: Durability ): # type: (...)->MutationResult pass @_inject_scope_and_collection def mutate_in(self, # type: CBCollection id, # type: str spec, # type: MutateInSpec *options, # type: MutateInOptions **kwargs # type: Any ): # type: (...)->ResultPrecursor """Perform multiple atomic modifications within a document. :param key: The key of the document to modify :param MutateInSpec spec: An iterable of specs (See :mod:`.couchbase.mutate_in.MutateInSpecItemBase`) :param bool create_doc: Whether the document should be create if it doesn't exist :param bool insert_doc: If the document should be created anew, and the operations performed *only* if it does not exist. :param bool upsert_doc: If the document should be created anew if it does not exist. If it does exist the commands are still executed. :param kwargs: CAS, etc. :return: A :class:`~.couchbase.MutationResult` object. Here's an example of adding a new tag to a "user" document and incrementing a modification counter:: import couchbase_core.subdocument as SD # .... cb.mutate_in('user', SD.array_addunique('tags', 'dog'), SD.counter('updates', 1)) .. note:: The `insert_doc` and `upsert_doc` options are mutually exclusive. Use `insert_doc` when you wish to create a new document with extended attributes (xattrs). .. seealso:: :mod:`.couchbase_core.subdocument` """ final_options = forward_args(kwargs, *options) return MutateInResult(self.bucket.mutate_in(id, spec, **final_options), **final_options) def binary(self): # type: (...)->BinaryCollection pass @overload def append(self, id, # type: str value, # type: str *options # type: AppendOptions ): # type: (...)->MutationResult pass @overload def append(self, id, # type: str value, # type: str cas=0, # type: int format=None, # type: int persist_to=PersistTo.NONE, # type: PersistTo.Value replicate_to=ReplicateTo.NONE, # type: ReplicateTo.Value durability_level=Durability.NONE # type: Durability ): pass @_mutate_result_and_inject def append(self, id, # type: str value, # type: str *options, # type: Any **kwargs # type: Any ): # type: (...)->ResultPrecursor """Append a string to an existing value in Couchbase. :param string value: The data to append to the existing value. Other parameters follow the same conventions as :meth:`upsert`. The `format` argument must be one of :const:`~couchbase_core.FMT_UTF8` or :const:`~couchbase_core.FMT_BYTES`. If not specified, it will be :const:`~.FMT_UTF8` (overriding the :attr:`default_format` attribute). This is because JSON or Pickle formats will be nonsensical when random data is appended to them. If you wish to modify a JSON or Pickle encoded object, you will need to retrieve it (via :meth:`get`), modify it, and then store it again (using :meth:`upsert`). Additionally, you must ensure the value (and flags) for the current value is compatible with the data to be appended. For an example, you may append a :const:`~.FMT_BYTES` value to an existing :const:`~couchbase_core.FMT_JSON` value, but an error will be thrown when retrieving the value using :meth:`get` (you may still use the :attr:`data_passthrough` to overcome this). :raise: :exc:`.NotStoredError` if the key does not exist """ x = _Base.append(self, id, value, forward_args(kwargs, *options)) return ResultPrecursor(x, options) @overload def prepend(self, id, # type: str value, # type: Any cas=0, # type: int format=None, # type: int persist_to=PersistTo.NONE, # type: PersistTo.Value replicate_to=ReplicateTo.NONE, # type: ReplicateTo.Value durability_level=Durability.NONE # type: Durability ): # type: (...)->MutationResult pass @overload def prepend(self, id, # type: str value, # type: str *options # type: PrependOptions ): # type: (...)->MutationResult pass def prepend(self, id, # type: str value, # type: str *options, # type: PrependOptions **kwargs # type: Any ): # type: (...)->ResultPrecursor """Prepend a string to an existing value in Couchbase. .. seealso:: :meth:`append` """ x = _Base.prepend(self, id, value, **forward_args(kwargs, *options)) return ResultPrecursor(x, options) @overload def increment(self, id, # type: str delta, # type: DeltaValue initial=None, # type: SignedInt64 expiration=Seconds(0), # type: Seconds durability_level=Durability.NONE # type: Durability ): # type: (...)->ResultPrecursor pass @overload def increment(self, id, # type: str delta, # type: DeltaValue *options, # type: CounterOptions **kwargs ): # type: (...)->ResultPrecursor pass @_mutate_result_and_inject def increment(self, id, # type: str delta, # type: DeltaValue *options, # type: CounterOptions **kwargs ): # type: (...)->ResultPrecursor """Increment the numeric value of an item. This method instructs the server to treat the item stored under the given key as a numeric counter. Counter operations require that the stored value exists as a string representation of a number (e.g. ``123``). If storing items using the :meth:`upsert` family of methods, and using the default :const:`couchbase_core.FMT_JSON` then the value will conform to this constraint. :param string key: A key whose counter value is to be modified :param DeltaValue delta: an amount by which the key should be incremented. :param couchbase.options.SignedInt64 initial: The initial value for the key, if it does not exist. If the key does not exist, this value is used, and `delta` is ignored. If this parameter is `None` then no initial value is used :param SignedInt64 initial: :class:`couchbase.options.SignedInt64` or `None` :param Seconds expiration: The lifetime for the key, after which it will expire :param Durability durability_level: Sync replication durability level. :raise: :exc:`.NotFoundError` if the key does not exist on the bucket (and `initial` was `None`) :raise: :exc:`.DeltaBadvalError` if the key exists, but the existing value is not numeric :return: A :class:`couchbase.result.MutationResult` object. Simple increment:: rv = cb.increment("key") cb.get("key").content_as[int] # 42 Increment by 10:: rv = cb.increment("key", DeltaValue(10)) Increment by 20, set initial value to 5 if it does not exist:: rv = cb.increment("key", DeltaValue(20), initial=SignedInt64(5)) """ final_opts = self._check_delta_initial(kwargs, *options) x = _Base.counter(self, id, delta=int(DeltaValue.verified(delta)), **final_opts) return ResultPrecursor(x, final_opts) @overload def decrement(self, id, # type: str delta, # type: DeltaValue initial=None, # type: SignedInt64 expiration=Seconds(0), # type: Seconds durability_level=Durability.NONE # type: Durability ): # type: (...)->ResultPrecursor pass @overload def decrement(self, id, # type: str delta, # type: DeltaValue *options, # type: CounterOptions **kwargs ): # type: (...)->ResultPrecursor pass @_mutate_result_and_inject def decrement(self, id, # type: str delta, # type: DeltaValue *options, # type: CounterOptions **kwargs ): # type: (...)->ResultPrecursor """Decrement the numeric value of an item. This method instructs the server to treat the item stored under the given key as a numeric counter. Counter operations require that the stored value exists as a string representation of a number (e.g. ``123``). If storing items using the :meth:`upsert` family of methods, and using the default :const:`couchbase_core.FMT_JSON` then the value will conform to this constraint. :param string key: A key whose counter value is to be modified :param DeltaValue delta: an amount by which the key should be decremented. :param couchbase.options.SignedInt64 initial: The initial value for the key, if it does not exist. If the key does not exist, this value is used, and `delta` is ignored. If this parameter is `None` then no initial value is used :param SignedInt64 initial: :class:`couchbase.options.SignedInt64` or `None` :param Seconds expiration: The lifetime for the key, after which it will expire :param Durability durability_level: Sync replication durability level. :raise: :exc:`.NotFoundError` if the key does not exist on the bucket (and `initial` was `None`) :raise: :exc:`.DeltaBadvalError` if the key exists, but the existing value is not numeric :return: A :class:`couchbase.result.MutationResult` object. Simple decrement:: rv = cb.decrement("key") cb.get("key").content_as[int] # 42 Decrement by 10:: rv = cb.decrement("key", DeltaValue(10)) Decrement by 20, set initial value to 5 if it does not exist:: rv = cb.decrement("key", DeltaValue(20), initial=SignedInt64(5)) """ final_opts = self._check_delta_initial(kwargs, *options) x = super(CBCollection,self).counter(id, delta=-int(DeltaValue.verified(delta)), **final_opts) return ResultPrecursor(x, final_opts) @staticmethod def _check_delta_initial(kwargs, *options): final_opts = forward_args(kwargs, *options) init_arg = final_opts.get('initial') initial = None if init_arg is None else int(SignedInt64.verified(init_arg)) if initial is not None: final_opts['initial'] = initial return final_opts
[docs]class Scope(object):
[docs] def __init__(self, # type: Scope parent, # type: couchbase.bucket.Bucket name=None # type: str ): # type: (...)->Any """ Collection scope representation. Constructor should only be invoked internally. :param parent: parent bucket. :param name: name of scope to open """ self._name = name self.bucket = parent
def __deepcopy__(self, memodict={}): result = copy.copy(self) return result @property def _realbucket(self): # type: (...)->CoreClient return self.bucket._bucket @property def name(self): # type (...)->str """ :return: A string value that is the name of the collection. :except ScopeNotFoundException :except AuthorizationException """ return self._name
[docs] def default_collection(self, # type: Scope *options, # type: Any **kwargs # type: Any ): # type: (...)->CBCollection """ Returns the default collection for this bucket. :param collection_name: string identifier for a given collection. :param options: collection options :return: A :class:`.Collection` for a collection with the given name. :raise: CollectionNotFoundException :raise: AuthorizationException """ return self._gen_collection(None, *options)
def _gen_collection(self, collection_name, # type: Optional[str] *options # type: CollectionOptions ): # type: (...)->CBCollection return CBCollection.cast(self, collection_name, *options)
[docs] @uncommitted def collection(self, collection_name, # type: str *options # type: CollectionOptions ): # type: (...) -> CBCollection """ Gets the named collection for this bucket. :param collection_name: string identifier for a given collection. :param options: collection options :return: A :class:`.Collection` for a collection with the given name. :raise: CollectionNotFoundException :raise: AuthorizationException """ return self._gen_collection(collection_name, *options)
Collection = CBCollection UpsertOptions = CBCollection.UpsertOptions class AsyncCBCollection(with_metaclass(AsyncClientFactory, CBCollection)): def __init__(self, *args, **kwargs): super(AsyncCBCollection, self).__init__(*args, **kwargs)