Source code for acouchbase.cluster

from typing import *
import asyncio
from asyncio import AbstractEventLoop

from couchbase.cluster import AsyncCluster as V3AsyncCluster
from couchbase.bucket import AsyncBucket as V3AsyncBucket
from couchbase_core.client import Client as CoreClient
from couchbase.collection import AsyncCBCollection as BaseAsyncCBCollection, CBCollection
from acouchbase.asyncio_iops import IOPS
from acouchbase.iterator import AQueryResult, ASearchResult, AAnalyticsResult, AViewResult

T = TypeVar('T', bound=CoreClient)


[docs]class AIOClientMixin(object): def __new__(cls, *args, **kwargs): # type: (...) -> Type[T] if not hasattr(cls, "AIO_wrapped"): for k, method in cls._gen_memd_wrappers(AIOClientMixin._meth_factory).items(): setattr(cls, k, method) cls.AIO_wrapped = True return super(AIOClientMixin, cls).__new__(cls, *args, **kwargs) @staticmethod def _meth_factory(meth, _): def ret(self, *args, **kwargs): rv = meth(self, *args, **kwargs) ft = asyncio.Future() def on_ok(res): ft.set_result(res) rv.clear_callbacks() def on_err(_, excls, excval, __): err = excls(excval) ft.set_exception(err) rv.clear_callbacks() rv.set_callbacks(on_ok, on_err) return ft return ret
[docs] def __init__(self, connstr=None, *args, **kwargs): loop = asyncio.get_event_loop() super(AIOClientMixin, self).__init__( connstr, *args, iops=IOPS(loop), **kwargs) self._loop = loop if issubclass(type(self), CBCollection): # do not set the connection callback for a collection return cft = asyncio.Future(loop=loop) def ftresult(err): if err: cft.set_exception(err) else: cft.set_result(True) self._cft = cft self._conncb = ftresult
[docs] def on_connect(self): if not self.connected: self._connect() return self._cft
connected = CoreClient.connected
class AsyncCBCollection(AIOClientMixin, BaseAsyncCBCollection): def __init__(self, *args, **kwargs ): super(AsyncCBCollection, self).__init__(*args, **kwargs) Collection = AsyncCBCollection class ABucket(AIOClientMixin, V3AsyncBucket): def __init__(self, *args, **kwargs): super(ABucket, self).__init__( collection_factory=AsyncCBCollection, *args, **kwargs) def view_query(self, *args, **kwargs): if "itercls" not in kwargs: kwargs["itercls"] = AViewResult return super(ABucket, self).view_query(*args, **kwargs) Bucket = ABucket class ACluster(AIOClientMixin, V3AsyncCluster): def __init__(self, connection_string, *options, **kwargs): super(ACluster, self).__init__(connection_string, *options, bucket_factory=Bucket, **kwargs) def query(self, *args, **kwargs): if "itercls" not in kwargs: kwargs["itercls"] = AQueryResult return super(ACluster, self).query(*args, **kwargs) def search_query(self, *args, **kwargs): if "itercls" not in kwargs: kwargs["itercls"] = ASearchResult return super(ACluster, self).search_query(*args, **kwargs) def analytics_query(self, *args, **kwargs): return super(ACluster, self).analytics_query(*args, itercls=kwargs.pop('itercls', AAnalyticsResult), **kwargs) Cluster = ACluster def get_event_loop(evloop=None # type: AbstractEventLoop ): """ Get an event loop compatible with acouchbase. Some Event loops, such as ProactorEventLoop (the default asyncio event loop for Python 3.8 on Windows) are not compatible with acouchbase as they don't implement all members in the abstract base class. :param evloop: preferred event loop :return: The preferred event loop, if compatible, otherwise, a compatible alternative event loop. """ return IOPS.get_event_loop(evloop) def close_event_loop(): """ Close the event loop used by acouchbase. """ IOPS.close_event_loop()