from couchbase.management.collections import CollectionManager
from couchbase.management.views import ViewIndexManager
from couchbase.management.admin import Admin
from couchbase.management.views import DesignDocumentNamespace
from couchbase_core.client import Client as CoreClient
import couchbase_core._libcouchbase as _LCB
from .collection import CBCollection, CoreClientDatastructureWrap
from .options import OptionBlockTimeOut
from .result import *
from .collection import Scope
from datetime import timedelta
from enum import Enum
from couchbase.tracing import CouchbaseSpan
import logging
from couchbase_core.asynchronous.client import AsyncClientMixin
class ViewScanConsistency(Enum):
NOT_BOUNDED = 'ok'
REQUEST_PLUS = 'false'
UPDATE_AFTER = 'update_after'
class ViewOrdering(Enum):
DESCENDING = 'true'
ASCENDING = 'false'
class ViewErrorMode(Enum):
CONTINUE = 'continue'
STOP = 'stop'
class ViewOptions(OptionBlockTimeOut):
@overload
def __init__(self,
timeout=None, # type: timedelta
scan_consistency=None, # type: ViewScanConsistency
skip=None, # type: int
limit=None, # type: int
startkey=None, # type: Any
endkey=None, # type: Any
startkey_docid=None, # type: str
endkey_docid=None, # type: str
inclusive_end=None, # type: bool
group=None, # type: bool
group_level=None, # type: int
key=None, # type: Any
keys=None, # type: Any
order=None, # type: ViewOrdering
reduce=None, # type: bool
on_error=None, # type: ViewErrorMode
debug=None, # type: bool
raw=None, # type: Tuple(str,str)
namespace=None, # type: DesignDocumentNamespace
span=None # type: CouchbaseSpan
):
pass
def __init__(self, **kwargs):
# massage them into the form needed by the existing
# view infrastructure... TODO: get rid of this when
# we kill off the v2 stuff
val = kwargs.pop('scan_consistency', None)
if val:
kwargs['stale'] = val.value
val = kwargs.pop('order', None)
if val:
kwargs['descending'] = val.value
val = kwargs.pop('on_error', None)
if val:
kwargs['on_error'] = val.value
val = kwargs.pop('raw', None)
if val:
kwargs[val[0]] = val[1]
val = kwargs.pop('namespace', None)
if val:
kwargs['use_devmode'] = (
val == DesignDocumentNamespace.DEVELOPMENT)
super(ViewOptions, self).__init__(**kwargs)
class PingOptions(OptionBlockTimeOut):
@overload
def __init__(self,
timeout=None, # type: timedelta
report_id=None, # type: str
service_types=None # type: Iterable[ServiceType]
):
"""
Create options used for ping command.
:param timedelta timeout: Currently not implemented, coming soon.
:param str report_id: Add an id to the request, which you can track in logging, etc...
:param Iterable[ServiceType] service_types: Restrict the ping to the services passed in here.
"""
pass
def __init__(self,
**kwargs
):
if 'service_types' in kwargs:
kwargs['service_types'] = list(
map(lambda x: x.value, kwargs['service_types']))
super(PingOptions, self).__init__(**kwargs)
[docs]class Bucket(CoreClientDatastructureWrap):
@internal
def __init__(self,
connection_string=None, # type: str
name=None, # type: str
collection_factory=CBCollection, # type: Type[CBCollection]
admin=None, # type: Admin
*options,
**kwargs
):
# type: (...) -> None
"""
Connect to a bucket.
Typically invoked by :meth:`couchbase.cluster.Cluster.open_bucket`
:param str connection_string:
The connection string to use for connecting to the bucket.
This is a URI-like string allowing specifying multiple hosts
and a bucket name.
The format of the connection string is the *scheme*
(``couchbase`` for normal connections, ``couchbases`` for
SSL enabled connections); a list of one or more *hostnames*
delimited by commas; a *bucket* and a set of options.
like so::
couchbase://host1,host2,host3/bucketname?option1=value1&option2=value2
If using the SSL scheme (``couchbases``), ensure to specify
the ``certpath`` option to point to the location of the
certificate on the client's filesystem; otherwise connection
may fail with an error code indicating the server's
certificate could not be trusted.
See :ref:`connopts` for additional connection options.
:param str name: name of bucket.
:param string username: username to connect to bucket with
:param string password: the password of the bucket
:param boolean quiet: the flag controlling whether to raise an
exception when the client executes operations on
non-existent keys. If it is `False` it will raise
:exc:`.DocumentNotFoundException` exceptions. When
set to `True` the operations will return `None` silently.
:param boolean unlock_gil: If set (which is the default), the
bucket object will release the python GIL when possible,
allowing other (Python) threads to function in the
background. This should be set to true if you are using
threads in your application (and is the default), as
otherwise all threads will be blocked while couchbase
functions execute.
You may turn this off for some performance boost and you are
certain your application is not using threads
:param transcoder:
Set the transcoder object to use. This should conform to the
interface in the documentation (it need not actually be a
subclass). This can be either a class type to instantiate,
or an initialized instance.
:type transcoder: :class:`.Transcoder`
:param lockmode: The *lockmode* for threaded access.
See :ref:`multiple_threads` for more information.
:param tracer: An OpenTracing tracer into which
to propagate any tracing information. Requires
tracing to be enabled.
:raise: :exc:`.BucketNotFoundException` or :exc:`.AuthenticationException` if
there is no such bucket to connect to, or if invalid
credentials were supplied.
:raise: :exc:`.CouchbaseNetworkException` if the socket wasn't
accessible (doesn't accept connections or doesn't respond
in
:raise: :exc:`.InvalidException` if the connection string
was malformed.
:return: instance of :class:`~couchbase.bucket.Bucket`
"""
self._name = name or kwargs.get('bucket', None)
self._connstr = connection_string
self._bucket_args = forward_args(kwargs, *options)
self._bucket_args['bucket'] = name
self._collection_factory = collection_factory
super(Bucket, self).__init__(connection_string, **self._bucket_args)
self._admin = admin
@property
def _bucket(self):
return self
@property
def name(self):
# type: (...) -> str
"""
Get the name of this bucket.
:return: Name of this bucket.
:rtype: str
"""
return self._name
[docs] def scope(self,
scope_name # type: str
):
# type: (...) -> Scope
"""
Open the named scope.
:param scope_name: Name of scope to open on this bucket.
:return: the named scope
"""
return Scope(self, scope_name)
[docs] def default_collection(self):
# type: (...) -> CBCollection
"""
Open the default collection.
:return: the default :class:`Collection` object.
"""
return Scope(self).default_collection()
[docs] def collection(self,
collection_name # type: str
):
# type: (...) -> CBCollection
"""
Open a collection in the default scope.
:param collection_name: collection name
:return: the default :class:`.Collection` object.
"""
return Scope(self).collection(collection_name)
def collections(self # type: Bucket
):
# type: (...) -> CollectionManager
"""
Get the CollectionManager.
:return: the :class:`.management.CollectionManager` for this bucket.
"""
return CollectionManager(self._admin, self._name)
[docs] def view_query(self,
design_doc, # type: str
view_name, # type: str
*view_options, # type: ViewOptions
**kwargs
):
# type: (...) -> ViewResult
"""
Run a View Query
:param str design_doc: design document
:param str view_name: view name
:param ViewOptions view_options: Options to use when querying a view index.
:param kwargs: Override corresponding option in options.
:return: A :class:`ViewResult` containing the view results
"""
final_kwargs = {'itercls': ViewResult}
final_kwargs.update(kwargs)
res = CoreClient.view_query(
self, design_doc, view_name, **forward_args(final_kwargs, *view_options))
return res
def view_indexes(self # type: Bucket
):
# type: (...) -> ViewIndexManager
"""
Get the ViewIndexManager for this bucket.
:return: The :class:`.management.ViewIndexManager` for this bucket.
"""
return ViewIndexManager(self, self._admin, self._name)
def ping(self,
*options, # type: PingOptions
**kwargs
):
# type: (...) -> PingResult
"""
Actively contacts each of the services and returns their pinged status.
:param options: Options for sending the ping request.
:param kwargs: Overrides corresponding value in options.
:return: A :class:`PingResult` representing the state of all the pinged services.
:raise: CouchbaseException for various communication issues.
"""
return PingResult(super(Bucket, self).ping(
**forward_args(kwargs, *options)))
@property
def kv_timeout(self):
# type: (...) -> timedelta
"""
The default timeout for all kv operations on this bucket.
::
# Set the default kv timeout to 10 seconds:
bucket.kv_timeout = timedelta(seconds=10)
# Get the current default:
timeout = bucket.kv_timeout
"""
return timedelta(seconds=self._get_timeout_common(
_LCB.LCB_CNTL_OP_TIMEOUT))
@property
def views_timeout(self):
# type: (...) -> timedelta
"""
The default timeout for all view operations on this bucket.
::
# Set the default view timeout to 10 seconds:
cb.view_timeout = timedelta(seconds=10)
# Get the default view timeout:
timeout = cb.view_timeout
"""
# lets use the private function in the private _bucket for now. Soon
# that _bucket will be gone and we will have migrated this all into
# here.
return timedelta(seconds=self._get_timeout_common(
_LCB.LCB_CNTL_VIEW_TIMEOUT))
@property
def tracing_orphaned_queue_flush_interval(self):
"""
The tracing orphaned queue flush interval, as a `timedelta`
::
# Set tracing orphaned queue flush interval to 0.5 seconds
cb.tracing_orphaned_queue_flush_interval = timedelta(seconds=0.5)
"""
return timedelta(seconds=self._cntl(op=_LCB.TRACING_ORPHANED_QUEUE_FLUSH_INTERVAL,
value_type="timeout"))
@property
def tracing_orphaned_queue_size(self):
"""
The tracing orphaned queue size.
::
# Set tracing orphaned queue size to 100 entries
cb.tracing_orphaned_queue_size = 100
"""
return self._cntl(op=_LCB.TRACING_ORPHANED_QUEUE_SIZE,
value_type="uint32_t")
@property
def tracing_threshold_queue_flush_interval(self):
"""
The tracing threshold queue flush interval, as a `timedelta`
::
# Set tracing threshold queue flush interval to 0.5 seconds
cb.tracing_threshold_queue_flush_interval = timedelta(seconds=0.5)
"""
return timedelta(seconds=self._cntl(op=_LCB.TRACING_THRESHOLD_QUEUE_FLUSH_INTERVAL,
value_type="timeout"))
@property
def tracing_threshold_queue_size(self):
"""
The tracing threshold queue size.
::
# Set tracing threshold queue size to 100 entries
cb.tracing_threshold_queue_size = 100
"""
return self._cntl(op=_LCB.TRACING_THRESHOLD_QUEUE_SIZE,
value_type="uint32_t")
@property
def tracing_threshold_kv(self):
"""
The tracing threshold for KV, as a `timedelta`.
::
# Set tracing threshold for KV to 0.5 seconds
cb.tracing_threshold_kv = timedelta(seconds=0.5)
"""
return timedelta(seconds=self._cntl(
op=_LCB.TRACING_THRESHOLD_KV, value_type="timeout"))
@property
def tracing_threshold_view(self):
"""
The tracing threshold for View, as `timedelta`.
::
# Set tracing threshold for View to 0.5 seconds
cb.tracing_threshold_view = timedelta(seconds=0.5)
"""
return timedelta(seconds=self._cntl(
op=_LCB.TRACING_THRESHOLD_VIEW, value_type="timeout"))
class AsyncBucket(AsyncClientMixin, Bucket):
pass