Source code for couchbase.analytics
from couchbase.options import QueryBaseOptions, enum_value
from couchbase_core.mapper import identity
from .n1ql import *
from couchbase_core.n1ql import N1QLRequest
from enum import Enum
from couchbase_core.analytics import AnalyticsQuery, AnalyticsRequest
from couchbase_core import iterable_wrapper
from typing import *
from datetime import timedelta
class AnalyticsIndex(dict):
def __init__(self, **kwargs):
print("creating index from {}".format(kwargs))
super(AnalyticsIndex, self).__init__(**kwargs['Index'])
@property
def name(self):
return self.get("IndexName", None)
@property
def dataset_name(self):
return self.get("DatasetName", None)
@property
def dataverse_name(self):
return self.get("DataverseName", None)
@property
def is_primary(self):
return self.get("IsPrimary", None)
class AnalyticsDataType(Enum):
STRING = 'string'
INT64 = 'int64'
DOUBLE = 'double'
class AnalyticsDataset(dict):
def __init__(self, **kwargs):
super(AnalyticsDataset, self).__init__(**kwargs)
@property
def dataset_name(self):
return self.get("DatasetName", None)
@property
def dataverse_name(self):
return self.get('DataverseName', None)
@property
def link_name(self):
return self.get('LinkName', None)
@property
def bucket_name(self):
return self.get('BucketName', None)
[docs]class AnalyticsResult(iterable_wrapper(AnalyticsRequest)):
def client_context_id(self):
return super(AnalyticsResult, self).client_context_id()
def signature(self):
return super(AnalyticsResult, self).signature()
def warnings(self):
return super(AnalyticsResult, self).warnings()
def request_id(self):
return super(AnalyticsResult, self).request_id()
[docs] def __init__(self,
*args, **kwargs # type: N1QLRequest
):
super(AnalyticsResult, self).__init__(*args, **kwargs)
class AnalyticsScanConsistency(enum.Enum):
NOT_BOUNDED = "not_bounded"
REQUEST_PLUS = "request_plus"
class AnalyticsOptions(QueryBaseOptions):
VALID_OPTS = {'timeout': {'timeout': timedelta.seconds},
'read_only': {'readonly': identity},
'scan_consistency': {'consistency': enum_value},
'client_context_id': {},
'positional_parameters': {},
'named_parameters': {},
'query_context': {'query_context': identity},
'raw': {}}
TARGET_CLASS = AnalyticsQuery
@overload
def __init__(self,
timeout=None, # type: timedelta
read_only=None, # type: bool
scan_consistency=None, # type: AnalyticsScanConsistency
client_context_id=None, # type: str
priority=None, # type: bool
positional_parameters=None, # type: Iterable[str]
named_parameters=None, # type: Dict[str, str]
query_context=None, # type: str
raw=None, # type: Dict[str,Any]
):
"""
:param timeout:
:param read_only:
:param scan_consistency:
:param client_context_id:
:param priority:
:param positional_parameters:
:param named_parameters:
:param query_context:
:param raw:
"""
pass
def __init__(self,
**kwargs
):
super(AnalyticsOptions, self).__init__(**kwargs)