Source code for txcouchbase.cluster

# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file contains the twisted-specific bits for the Couchbase client.
"""

from typing import *

from twisted.internet import reactor
from twisted.internet.defer import Deferred

from couchbase_core.asynchronous.analytics import AsyncAnalyticsRequest
from couchbase.asynchronous import AsyncViewResult, AsyncQueryResultBase, AsyncAnalyticsResultBase, AsyncSearchResult
from couchbase.cluster import Cluster as V3SyncCluster, AsyncCluster as V3AsyncCluster
from couchbase.collection import AsyncCBCollection as BaseAsyncCBCollection
from couchbase_core.asynchronous.events import EventQueue
from couchbase.asynchronous.search import AsyncSearchRequest
from couchbase_core.asynchronous.n1ql import AsyncN1QLRequest
from couchbase_core.asynchronous.view import AsyncViewBase
from couchbase_core.client import Client as CoreClient
from couchbase.exceptions import CouchbaseException
from couchbase_core.supportability import internal
from txcouchbase.iops import v0Iops
from couchbase.bucket import AsyncBucket as V3AsyncBucket


class BatchedRowMixin(object):
    def __init__(self, *args, **kwargs):
        """
        Iterator/Container object for a single-call row-based results.

        This functions as an iterator over all results of the query, once the
        query has been completed.

        Additional metadata may be obtained by examining the object. See
        :class:`~couchbase_core.views.iterator.Views` for more details.

        You will normally not need to construct this object manually.
        """
        self._d = Deferred()
        self.__rows = [] # likely a superlcass might have this?

    def _getDeferred(self):
        return self._d

    def start(self):
        super(BatchedRowMixin, self).start()
        self.raw.rows_per_call = -1
        return self

    def on_rows(self, rowiter):
        """
        Reimplemented from :meth:`~AsyncViewBase.on_rows`
        """
        self.__rows = rowiter
        self._d.callback(self)
        self._d = None

    def on_error(self, ex):
        """
        Reimplemented from :meth:`~AsyncViewBase.on_error`
        """
        if self._d:
            self._d.errback()
            self._d = None

    def on_done(self):
        """
        Reimplemented from :meth:`~AsyncViewBase.on_done`
        """
        if self._d:
            self._d.callback(self)
            self._d = None

    def __iter__(self):
        """
        Iterate over the rows in this resultset
        """
        return iter(self.__rows)


class BatchedView(BatchedRowMixin, AsyncViewBase):
    def __init__(self, *args, **kwargs):
        AsyncViewBase.__init__(self, *args, **kwargs)
        BatchedRowMixin.__init__(self, *args, **kwargs)


[docs]class BatchedViewResult(BatchedRowMixin, AsyncViewResult):
[docs] def __init__(self, *args, **kwargs): AsyncViewResult.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
class BatchedN1QLRequest(BatchedRowMixin, AsyncN1QLRequest): def __init__(self, *args, **kwargs): AsyncN1QLRequest.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
[docs]class BatchedQueryResult(BatchedRowMixin, AsyncQueryResultBase):
[docs] def __init__(self, *args, **kwargs): AsyncQueryResultBase.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
class BatchedAnalyticsRequest(BatchedRowMixin, AsyncAnalyticsRequest): def __init__(self, *args, **kwargs): AsyncAnalyticsRequest.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
[docs]class BatchedAnalyticsResult(BatchedRowMixin, AsyncAnalyticsResultBase):
[docs] def __init__(self, *args, **kwargs): AsyncAnalyticsResultBase.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
class BatchedSearchRequest(BatchedRowMixin, AsyncSearchRequest): def __init__(self, *args, **kwargs): AsyncSearchRequest.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
[docs]class BatchedSearchResult(BatchedRowMixin, AsyncSearchResult):
[docs] def __init__(self, *args, **kwargs): AsyncSearchResult.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
class TxEventQueue(EventQueue): """ Subclass of EventQueue. This implements the relevant firing methods, treating an 'Event' as a 'Deferred' """ def fire_async(self, event): reactor.callLater(0, event.callback, None) def call_single_success(self, event, *args, **kwargs): event.callback(None) def call_single_failure(self, event, *args, **kwargs): event.errback(None) class ConnectionEventQueue(TxEventQueue): """ For events fired upon connect """ def maybe_raise(self, err, *args, **kwargs): if not err: return raise err T = TypeVar('T', bound=CoreClient)
[docs]class TxRawClientMixin(object):
[docs] @internal def __init__(self, connstr=None, **kwargs): """ Client mixin for Twisted. This inherits from an 'AsyncClient' class, but also adds some twisted-specific logic for hooking on a connection. """ if connstr and 'connstr' not in kwargs: kwargs['connstr'] = connstr iops = v0Iops(reactor) super(TxRawClientMixin, self).__init__(iops=iops, **kwargs) self._evq = { 'connect': ConnectionEventQueue(), '_dtor': TxEventQueue() } self._conncb = self._evq['connect'] self._dtorcb = self._evq['_dtor']
[docs] def registerDeferred(self, event, d): """ Register a defer to be fired at the firing of a specific event. :param string event: Currently supported values are `connect`. Another value may be `_dtor` which will register an event to fire when this object has been completely destroyed. :param event: The defered to fire when the event succeeds or failes :type event: :class:`Deferred` If this event has already fired, the deferred will be triggered asynchronously. Example:: def on_connect(*args): print("I'm connected") def on_connect_err(*args): print("Connection failed") d = Deferred() cb.registerDeferred('connect', d) d.addCallback(on_connect) d.addErrback(on_connect_err) :raise: :exc:`ValueError` if the event name is unrecognized """ try: self._evq[event].schedule(d) except KeyError: raise ValueError("No such event type", event)
[docs] def on_connect(self): """ Short-hand for the following idiom:: d = Deferred() cb.registerDeferred('connect', d) return d :return: A :class:`Deferred` """ d = Deferred() self.registerDeferred('connect', d) return d
[docs] def defer(self, opres): """ Converts a raw :class:`couchbase_core.results.AsyncResult` object into a :class:`Deferred`. This is shorthand for the following "non-idiom":: d = Deferred() opres = cb.upsert("foo", "bar") opres.callback = d.callback def d_err(res, ex_type, ex_val, ex_tb): d.errback(opres, ex_type, ex_val, ex_tb) opres.errback = d_err return d :param opres: The operation to wrap :type opres: :class:`couchbase_core.results.AsyncResult` :return: a :class:`Deferred` object. Example:: opres = cb.upsert("foo", "bar") d = cb.defer(opres) def on_ok(res): print("Result OK. Cas: {0}".format(res.cas)) d.addCallback(opres) """ d = Deferred() def _on_err(mres, ex_type, ex_val, ex_tb): try: raise ex_type(ex_val) except CouchbaseException: d.errback() opres.set_callbacks(d.callback, _on_err) return d
def deferred_verb(self, itercls, raw_verb, cooked_verb, *args, **kwargs): if not self.connected: cb = lambda x: cooked_verb(*args, **kwargs) return self.on_connect().addCallback(cb) kwargs['itercls'] = itercls o = raw_verb(*args, **kwargs) o.start() return o._getDeferred() connected = CoreClient.connected
[docs]class TxDeferredClientMixin(TxRawClientMixin): def __new__(cls, *args, **kwargs): if not hasattr(cls, "TxDeferred_Wrapped"): for k, v in cls._gen_memd_wrappers(TxDeferredClientMixin._meth_factory).items(): setattr(cls, k, v) cls.TxDeferred_Wrapped = True return super(TxDeferredClientMixin, cls).__new__(cls, *args, **kwargs)
[docs] @internal def __init__(self, *args, **kwargs): """ This mixin inherits from :class:`TxRawClientMixin`. In addition to the connection methods, this class' data access methods return :class:`Deferreds` instead of :class:`Result` objects. Operations such as :meth:`get` or :meth:`set` will invoke the :attr:`Deferred.callback` with the result object when the result is complete, or they will invoke the :attr:`Deferred.errback` with an exception (or :class:`Failure`) in case of an error. The rules of the :attr:`~couchbase_core.client.Client.quiet` attribute for raising exceptions apply to the invocation of the ``errback``. This means that in the case where the synchronous client would raise an exception, the Deferred API will have its ``errback`` invoked. Otherwise, the result's :attr:`~couchbase_v2.result.Result.success` field should be inspected. Likewise multi operations will be invoked with a :class:`~couchbase.result.MultiResultBase` compatible object. Some examples: Using single items:: d_set = cb.upsert("foo", "bar") d_get = cb.get("foo") def on_err_common(*args): print("Got an error: {0}".format(args)), def on_set_ok(res): print("Successfuly set key with CAS {0}".format(res.cas)) def on_get_ok(res): print("Successfuly got key with value {0}".format(res.value)) d_set.addCallback(on_set_ok).addErrback(on_err_common) d_get.addCallback(on_get_ok).addErrback(on_get_common) # Note that it is safe to do this as operations performed on the # same key are *always* performed in the order they were scheduled. Using multiple items:: d_get = cb.get_multi(("Foo", "bar", "baz")) def on_mres(mres): for k, v in mres.items(): print("Got result for key {0}: {1}".format(k, v.value)) d_get.addCallback(on_mres) """ super(TxDeferredClientMixin, self).__init__(*args, **kwargs)
def _connectSchedule(self, f, meth, *args, **kwargs): qop = Deferred() qop.addCallback(lambda x: f(meth, *args, **kwargs)) self._evq['connect'].schedule(qop) return qop def _wrap(self, # type: TxDeferredClient meth, *args, **kwargs): """ Calls a given method with the appropriate arguments, or defers such a call until the instance has been connected """ if not self.connected: return self._connectSchedule(self._wrap, meth, *args, **kwargs) opres = meth(self, *args, **kwargs) return self.defer(opres) ### Generate the methods @staticmethod def _meth_factory(meth, _): def ret(self, *args, **kwargs): return self._wrap(meth, *args, **kwargs) return ret
[docs]class TxRawCollection(TxRawClientMixin, BaseAsyncCBCollection): pass
[docs]class TxCollection(TxDeferredClientMixin, TxRawCollection): pass
[docs]class TxRawBucket(TxRawClientMixin, V3AsyncBucket):
[docs] @internal def __init__(self, *args, **kwargs): super(TxRawBucket, self).__init__(collection_factory=kwargs.pop('collection_factory', TxRawCollection), *args, **kwargs)
def view_query_ex(self, viewcls, *args, **kwargs): """ Query a view, with the ``viewcls`` instance receiving events of the query as they arrive. :param type viewcls: A class (derived from :class:`AsyncViewBase`) to instantiate Other arguments are passed to the standard `query` method. This functions exactly like the :meth:`~couchbase.asynchronous.AsyncClient.query` method, except it automatically schedules operations if the connection has not yet been negotiated. """ kwargs['itercls'] = viewcls o = super(TxRawBucket, self).view_query(*args, **kwargs) if not self.connected: self.on_connect().addCallback(lambda x: o.start()) else: o.start() return o def view_query(self, *args, **kwargs): """ Returns a :class:`Deferred` object which will have its callback invoked with a :class:`BatchedView` when the results are complete. Parameters follow conventions of :meth:`~couchbase_v2.bucket.Bucket.query`. Example:: d = cb.queryAll("beer", "brewery_beers") def on_all_rows(rows): for row in rows: print("Got row {0}".format(row)) d.addCallback(on_all_rows) """ if not self.connected: cb = lambda x: self.view_query(*args, **kwargs) return self.on_connect().addCallback(cb) kwargs['itercls'] = BatchedViewResult o = super(TxRawBucket, self).view_query(*args, **kwargs) try: o.start() except Exception as e: raise return o._getDeferred()
[docs]class TxBucket(TxDeferredClientMixin, TxRawBucket): @internal def __init__(self, *args, **kwargs): super(TxBucket,self).__init__(collection_factory=TxCollection, *args, **kwargs)
class TxBaseCluster(TxRawClientMixin, V3AsyncCluster): def bucket(self, *args, **kwargs): return super(TxBaseCluster, self).bucket(*args, **kwargs)
[docs]class TxRawCluster(TxBaseCluster):
[docs] def __init__(self, *args, **kwargs): super(TxRawCluster, self).__init__(*args, bucket_factory=kwargs.pop('bucket_factory', TxRawBucket), **kwargs)
[docs] def query_ex(self, cls, *args, **kwargs): """ Execute a N1QL statement providing a custom handler for rows. This method allows you to define your own subclass (of :class:`~AsyncN1QLRequest`) which can handle rows as they are received from the network. :param cls: The subclass (not instance) to use :param args: Positional arguments for the class constructor :param kwargs: Keyword arguments for the class constructor .. seealso:: :meth:`queryEx`, around which this method wraps """ kwargs['itercls'] = cls o = super(TxRawCluster, self).query(*args, **kwargs) if not self.connected: self.on_connect().addCallback(lambda x: o.start()) else: o.start() return o
[docs] def query(self, *args, **kwargs): """ Execute a N1QL query, retrieving all rows. This method returns a :class:`Deferred` object which is executed with a :class:`~.N1QLRequest` object. The object may be iterated over to yield the rows in the result set. This method is similar to :meth:`~couchbase_v2.bucket.Bucket.n1ql_query` in its arguments. Example:: def handler(req): for row in req: # ... handle row d = cb.n1qlQueryAll('SELECT * from `travel-sample` WHERE city=$1`, 'Reno') d.addCallback(handler) :return: A :class:`Deferred` .. seealso:: :meth:`~couchbase_v2.bucket.Bucket.n1ql_query` """ return self.deferred_verb(BatchedQueryResult, super(TxRawCluster,self).query, self.query, *args, **kwargs)
[docs] def analytics_query(self, *args, **kwargs): return self.deferred_verb(BatchedAnalyticsResult, super(TxRawCluster, self).analytics_query, self.analytics_query, *args, **kwargs)
def search(self, cls, *args, **kwargs): """ Experimental Method Execute a Search query providing a custom handler for rows. This method allows you to define your own subclass (of :class:`~AsyncSearchRequest`) which can handle rows as they are received from the network. :param cls: The subclass (not instance) to use :param args: Positional arguments for the class constructor :param kwargs: Keyword arguments for the class constructor .. seealso:: :meth:`search`, around which this method wraps """ kwargs['itercls'] = cls o = super(TxRawCluster, self).search_query(*args, **kwargs) if not self.connected: self.on_connect().addCallback(lambda x: o.start()) else: o.start() return o
[docs] def search_query(self, *args, **kwargs): """ Experimental Method Execute a Search query, retrieving all rows. This method returns a :class:`Deferred` object which is executed with a :class:`~.SearchRequest` object. The object may be iterated over to yield the rows in the result set. This method is similar to :meth:`~couchbase_v2.bucket.Bucket.search` in its arguments. Example:: def handler(req): for row in req: # ... handle row d = cb.search('name', ft.MatchQuery('nosql'), limit=10) d.addCallback(handler) :return: A :class:`Deferred` .. seealso:: :meth:`~couchbase_v2.bucket.Bucket.search` """ if not self.connected: cb = lambda x: self.search_query(*args, **kwargs) return self.on_connect().addCallback(cb) kwargs['itercls'] = BatchedSearchResult o = super(TxRawCluster, self).search_query(*args, **kwargs) o.start() return o._getDeferred()
[docs]class TxCluster(TxDeferredClientMixin, TxRawCluster):
[docs] def __init__(self, *args, **kwargs): super(TxCluster, self).__init__(*args, bucket_factory=kwargs.pop('bucket_factory', TxBucket), **kwargs)
class TxSyncCluster(V3SyncCluster): def __init__(self, *args, **kwargs): super(TxSyncCluster, self).__init__(*args, bucket_factory=kwargs.pop('bucket_factory', TxBucket), **kwargs)