from couchbase_core.subdocument import Spec
from .options import Seconds, FiniteDuration
from couchbase_core.transcodable import Transcodable
from couchbase_core._libcouchbase import Result as SDK2Result, Result
from typing import *
from boltons.funcutils import wraps
Proxy_T = TypeVar('Proxy_T')
try:
from abc import abstractmethod
except:
pass
from couchbase_core.result import MultiResult, SubdocResult
def canonical_sdresult(content):
sdresult = content # type: SubdocResult
result = {}
cursor = iter(sdresult)
for index in range(0, sdresult.result_count):
spec = sdresult._specs[index] # type: Spec
result[spec[1]] = next(cursor)
return result
def extract_value(content, decode_canonical):
if isinstance(content, MultiResult):
return {k: decode_canonical(content[k].value) for k, v, in content}
elif isinstance(content, SubdocResult):
return decode_canonical(canonical_sdresult(content))
return decode_canonical(content.value)
def get_decoder(item # type: Type[Union[Transcodable,Any]]
):
return getattr(item, 'decode_canonical', None) if issubclass(item, Transcodable) else item
class ContentProxy(object):
def __init__(self, content):
self.content = content
def __getitem__(self,
item # type: Type[Proxy_T]
):
# type: (...)->Union[Proxy_T,Mapping[str,Proxy_T]]
return extract_value(self.content, get_decoder(item))
class ContentProxySubdoc(object):
def __init__(self, content):
self.content=content
def index_proxy(self, item, index):
return get_decoder(item)(self.content[index])
def __getitem__(self,
item # type: Type[Proxy_T]
):
# type: (...)->Callable[[int],Union[Proxy_T,Mapping[str,Proxy_T]]]
return lambda index: self.index_proxy(item, index)
class IResult(object):
def __init__(self,
cas, # type: int
error=None # type: Optional[int]
):
self._cas = cas
self._error = error
@property
def cas(self):
# type: ()->int
return self._cas
@property
def error(self):
# type: ()->int
return self.error
def success(self):
# type: ()->bool
return not self.error
class IGetResult(IResult):
@property
@abstractmethod
def id(self):
# type: ()->str
pass
@property
@abstractmethod
def expiry(self):
# type: ()->FiniteDuration
pass
[docs]class LookupInResult(IResult):
def __init__(self,
content, # type: SDK2Result
*args, # type: Any
**kwargs # type: Any
):
# type: (...) ->None
"""
LookupInResult is the return type for lookup_in operations.
Constructed internally by the API.
"""
self._content = content # type: SDK2Result
self.dict = kwargs
@property
def content_as(self):
# type: (...)->ContentProxySubdoc
return ContentProxySubdoc(self._content)
def exists(self,
index # type: int
):
return len(canonical_sdresult(self._content))>index
class MutationResult(IResult):
def __init__(self,
cas, # type: int
mutation_token=None # type: MutationToken
):
super(MutationResult, self).__init__(cas)
self.mutationToken = mutation_token
def mutation_token(self):
# type: () -> MutationToken
return self.mutationToken
[docs]class MutateInResult(MutationResult):
def __init__(self,
content, # type: SDK2Result
**options # type: Any
):
# type: (...) ->None
"""
MutateInResult is the return type for mutate_in operations.
Constructed internally by the API.
"""
self._content = content # type: SDK2Result
self.dict = options
def content_as(self):
# type: (...)->ContentProxy
return ContentProxy(self._content)
class GetResult(IGetResult):
def __init__(self,
content, # type: SDK2Result
*args, # type: Any
**kwargs # type: Any
):
# type: (...) ->None
"""
GetResult is the return type for full read operations.
Constructed internally by the API.
"""
self._content = content # type: SDK2Result
self.dict = kwargs
def content_as_array(self):
# type: (...) ->List
return list(self.content)
@property
def content_as(self):
# type: (...)->ContentProxy
return ContentProxy(self._content)
def __getitem__(self, t):
return
@property
def id(self):
# type: () -> str
return self.dict['id']
@property
def cas(self):
# type: () -> int
return self.dict['cas']
@property
def expiry(self):
# type: () -> Seconds
return Seconds(self.dict['expiry'])
@property
def content(self):
# type: () -> SDK2Result
return extract_value(self._content, lambda x:x)
ResultPrecursor = NamedTuple('ResultPrecursor', [('orig_result',SDK2Result), ('orig_options', Mapping[str,Any])])
def get_result(x, # type: SDK2Result
options=None # type: Dict[str,Any]
):
# type: (...)->GetResult
options = options or {}
return GetResult(x, cas=getattr(x, 'cas'), expiry=options.pop('timeout', None), id=x.key)
def get_result_wrapper(func # type: Callable[[Any], ResultPrecursor]
):
# type: (...)->Callable[[Any], GetResult]
@wraps(func)
def wrapped(*args, **kwargs):
x, options = func(*args,**kwargs)
return get_result(x, options)
wrapped.__name__=func.__name__
wrapped.__doc__=func.__name__
return wrapped
class MutationToken(object):
def __init__(self, sequenceNumber, # type: int
vbucketId, # type: int
vbucketUUID # type: int
):
self.sequenceNumber = sequenceNumber
self.vbucketId = vbucketId
self.vbucketUUID = vbucketUUID
def partition_id(self):
# type: (...)->int
pass
def partition_uuid(self):
# type: (...)->int
pass
def sequence_number(self):
# type: (...)->int
pass
def bucket_name(self):
# type: (...)->str
pass
class SDK2MutationToken(MutationToken):
def __init__(self, token):
token=token or (None,None,None)
super(SDK2MutationToken,self).__init__(token[2],token[0],token[1])
def get_mutation_result(result # type: ResultPrecursor
):
# type (...)->MutationResult
return MutationResult(result.orig_result.cas, SDK2MutationToken(result.orig_result._mutinfo) if hasattr(result.orig_result, '_mutinfo') else None)
def _wrap_in_mutation_result(func # type: Callable[[Any,...],SDK2Result]
):
# type: (...)->Callable[[Any,...],MutationResult]
@wraps(func)
def mutated(*args, **kwargs):
result = func(*args, **kwargs)
return get_mutation_result(result)
mutated.__name__=func.__name__
mutated.__doc__=func.__doc__
return mutated