from .n1ql import *
from couchbase_core.n1ql import N1QLRequest
from couchbase.options import OptionBlockTimeOut
from enum import Enum
from couchbase_core.analytics import AnalyticsQuery, AnalyticsRequest
from couchbase_core import iterable_wrapper
from typing import *
class AnalyticsIndex(dict):
def __init__(self, **kwargs):
print("creating index from {}".format(kwargs))
super(AnalyticsIndex, self).__init__(**kwargs['Index'])
@property
def name(self):
return self.get("IndexName", None)
@property
def dataset_name(self):
return self.get("DatasetName", None)
@property
def dataverse_name(self):
return self.get("DataverseName", None)
@property
def is_primary(self):
return self.get("IsPrimary", None)
class AnalyticsDataType(Enum):
STRING='string'
INT64='int64'
DOUBLE='double'
class AnalyticsDataset(dict):
def __init__(self, **kwargs):
super(AnalyticsDataset, self).__init__(**kwargs)
@property
def dataset_name(self):
return self.get("DatasetName", None)
@property
def dataverse_name(self):
return self.get('DataverseName', None)
@property
def link_name(self):
return self.get('LinkName', None)
@property
def bucket_name(self):
return self.get('BucketName', None)
[docs]class AnalyticsResult(iterable_wrapper(AnalyticsRequest)):
def client_context_id(self):
return super(AnalyticsResult, self).client_context_id()
def signature(self):
return super(AnalyticsResult, self).signature()
def warnings(self):
return super(AnalyticsResult, self).warnings()
def request_id(self):
return super(AnalyticsResult, self).request_id()
[docs] def __init__(self,
*args, **kwargs # type: N1QLRequest
):
super(AnalyticsResult, self).__init__(*args, **kwargs)
class AnalyticsOptions(OptionBlockTimeOut):
VALID_OPTS = {'timeout', 'read_only', 'scan_consistency', 'client_context_id', 'positional_parameters',
'named_parameters', 'raw'}
@overload
def __init__(self,
timeout=None, # type: timedelta
read_only=None, # type: bool
scan_consistency=None, # type: QueryScanConsistency
client_context_id=None, # type: str
priority=None, # type: bool
positional_parameters=None, # type: Iterable[str]
named_parameters=None, # type: Dict[str, str]
raw=None, # type: Dict[str,Any]
):
"""
:param timeout:
:param read_only:
:param scan_consistency:
:param client_context_id:
:param priority:
:param positional_parameters:
:param named_parameters:
:param raw:
"""
pass
def __init__(self,
**kwargs
):
super(AnalyticsOptions, self).__init__(**kwargs)
def to_analytics_query(self, statement, *options, **kwargs):
# lets make a copy of the options, and update with kwargs...
args = self.copy()
args.update(kwargs)
# now lets get positional parameters. Actual positional
# params OVERRIDE positional_parameters
positional_parameters = args.pop('positional_parameters', [])
if options and len(options) > 0:
positional_parameters = options
# now the named parameters. NOTE: all the kwargs that are
# not VALID_OPTS must be named parameters, and the kwargs
# OVERRIDE the list of named_parameters
new_keys = list(filter(lambda x: x not in self.VALID_OPTS, args.keys()))
named_parameters = args.pop('named_parameters', {})
for k in new_keys:
named_parameters[k] = args[k]
query = AnalyticsQuery(statement, *positional_parameters, **named_parameters)
# TODO: there is surely a cleaner way...
for k in self.VALID_OPTS:
v = args.get(k, None)
if v:
if k == 'scan_consistency':
query.consistency = v.as_string()
if k == 'timeout':
query.timeout = v
if k == 'read_only':
query.readonly = v
if k == 'profile':
query.profile = v.as_string()
return query