from typing import *
import asyncio
from asyncio import AbstractEventLoop
from couchbase.cluster import AsyncCluster as V3AsyncCluster
from couchbase.bucket import AsyncBucket as V3AsyncBucket
from couchbase_core.client import Client as CoreClient
from couchbase.collection import AsyncCBCollection as BaseAsyncCBCollection, CBCollection
from acouchbase.asyncio_iops import IOPS
from acouchbase.iterator import AQueryResult, ASearchResult, AAnalyticsResult, AViewResult
T = TypeVar('T', bound=CoreClient)
[docs]class AIOClientMixin(object):
def __new__(cls, *args, **kwargs):
# type: (...) -> Type[T]
if not hasattr(cls, "AIO_wrapped"):
for k, method in cls._gen_memd_wrappers(AIOClientMixin._meth_factory).items():
setattr(cls, k, method)
cls.AIO_wrapped = True
return super(AIOClientMixin, cls).__new__(cls, *args, **kwargs)
@staticmethod
def _meth_factory(meth, _):
def ret(self, *args, **kwargs):
rv = meth(self, *args, **kwargs)
ft = asyncio.Future()
def on_ok(res):
ft.set_result(res)
rv.clear_callbacks()
def on_err(_, excls, excval, __):
err = excls(excval)
ft.set_exception(err)
rv.clear_callbacks()
rv.set_callbacks(on_ok, on_err)
return ft
return ret
[docs] def __init__(self, connstr=None, *args, **kwargs):
loop = asyncio.get_event_loop()
super(AIOClientMixin, self).__init__(
connstr, *args, iops=IOPS(loop), **kwargs)
self._loop = loop
if issubclass(type(self), CBCollection):
# do not set the connection callback for a collection
return
cft = asyncio.Future(loop=loop)
def ftresult(err):
if err:
cft.set_exception(err)
else:
cft.set_result(True)
self._cft = cft
self._conncb = ftresult
[docs] def on_connect(self):
if not self.connected:
self._connect()
return self._cft
connected = CoreClient.connected
class AsyncCBCollection(AIOClientMixin, BaseAsyncCBCollection):
def __init__(self,
*args,
**kwargs
):
super(AsyncCBCollection, self).__init__(*args, **kwargs)
Collection = AsyncCBCollection
class ABucket(AIOClientMixin, V3AsyncBucket):
def __init__(self, *args, **kwargs):
super(ABucket, self).__init__(
collection_factory=AsyncCBCollection, *args, **kwargs)
def view_query(self, *args, **kwargs):
if "itercls" not in kwargs:
kwargs["itercls"] = AViewResult
return super(ABucket, self).view_query(*args, **kwargs)
Bucket = ABucket
class ACluster(AIOClientMixin, V3AsyncCluster):
def __init__(self, connection_string, *options, **kwargs):
super(ACluster, self).__init__(connection_string,
*options, bucket_factory=Bucket, **kwargs)
def query(self, *args, **kwargs):
if "itercls" not in kwargs:
kwargs["itercls"] = AQueryResult
return super(ACluster, self).query(*args, **kwargs)
def search_query(self, *args, **kwargs):
if "itercls" not in kwargs:
kwargs["itercls"] = ASearchResult
return super(ACluster, self).search_query(*args, **kwargs)
def analytics_query(self, *args, **kwargs):
return super(ACluster, self).analytics_query(*args, itercls=kwargs.pop('itercls', AAnalyticsResult),
**kwargs)
Cluster = ACluster
def get_event_loop(evloop=None # type: AbstractEventLoop
):
"""
Get an event loop compatible with acouchbase.
Some Event loops, such as ProactorEventLoop (the default asyncio event
loop for Python 3.8 on Windows) are not compatible with acouchbase as
they don't implement all members in the abstract base class.
:param evloop: preferred event loop
:return: The preferred event loop, if compatible, otherwise, a compatible
alternative event loop.
"""
return IOPS.get_event_loop(evloop)
def close_event_loop():
"""
Close the event loop used by acouchbase.
"""
IOPS.close_event_loop()