Source code for couchbase.analytics

from abc import ABC, abstractmethod
from typing import *
from datetime import timedelta
from enum import Enum

from durationpy import from_str

from couchbase.options import QueryBaseOptions, enum_value
from couchbase_core.mapper import identity
from .n1ql import *
from couchbase_core.n1ql import N1QLRequest
from couchbase_core.analytics import AnalyticsQuery, AnalyticsRequest
from couchbase_core import iterable_wrapper, mk_formstr
from couchbase.exceptions import InvalidArgumentException


class AnalyticsIndex(dict):
    def __init__(self, **kwargs):
        #print("creating index from {}".format(kwargs))
        super(AnalyticsIndex, self).__init__(**kwargs['Index'])

    @property
    def name(self):
        return self.get("IndexName", None)

    @property
    def dataset_name(self):
        return self.get("DatasetName", None)

    @property
    def dataverse_name(self):
        return self.get("DataverseName", None)

    @property
    def is_primary(self):
        return self.get("IsPrimary", None)


class AnalyticsDataType(Enum):
    STRING = 'string'
    INT64 = 'int64'
    DOUBLE = 'double'


class AnalyticsLinkType(Enum):
    S3External = 's3'
    AzureBlobExternal = 'azureblob'
    CouchbaseRemote = 'couchbase'


class AnalyticsEncryptionLevel(Enum):
    NONE = 'none'
    HALF = 'half'
    FULL = 'full'


class AnalyticsDataset(dict):
    def __init__(self, **kwargs):
        super(AnalyticsDataset, self).__init__(**kwargs)

    @property
    def dataset_name(self):
        return self.get("DatasetName", None)

    @property
    def dataverse_name(self):
        return self.get('DataverseName', None)

    @property
    def link_name(self):
        return self.get('LinkName', None)

    @property
    def bucket_name(self):
        return self.get('BucketName', None)


class AnalyticsLink(ABC):
    """AnalytcsLinks are only available on Couchbase Server 7.0+
    """

    @abstractmethod
    def name(
        self,  # type: "AnalyticsLink"
    ) -> str:
        """Returns the name of the :class:`couchbase.analytics.AnalyticsLink`

        :return: The name of the :class:`couchbase.analytics.AnalyticsLink`
        """
        pass

    @abstractmethod
    def dataverse_name(
        self,  # type: "AnalyticsLink"
    ) -> str:
        """Returns the name of the dataverse the :class:`couchbase.analytics.AnalyticsLink` belongs to

        :return: The name of the dataverse
        """
        pass

    @abstractmethod
    def form_encode(
        self,  # type: "AnalyticsLink"
    ) -> bytes:
        """Encodes the :class:`couchbase.analytics.AnalyticsLink` into a form data representation,
            to send as the body of a :func:`couchbase.management.analytics.CreateLink` or
            :func:`couchbase.management.analytics.ReplaceLink`

        :return: A form encoded :class:`couchbase.analytics.AnalyticsLink`
        """
        pass

    @abstractmethod
    def validate(
        self,  # type: "AnalyticsLink"
    ):
        """Ensures the :class:`couchbase.analytics.AnalyticsLink` is valid.  Raises a :class:`couchbase.exceptions.InvalidArgumentException` if link is invalid.

        :return: None

        :raises: :class:`couchbase.exceptions.InvalidArgumentException`
        """
        pass

    @abstractmethod
    def link_type(
        self,  # type: "AnalyticsLink"
    ) -> AnalyticsLinkType:
        """Returns the :class:`couchbase.analytics.AnalyticsLinkType` of the :class:`couchbase.analytics.AnalyticsLink`

        :return: The corresponding :class:`couchbase.analytics.AnalyticsLinkType` of the :class:`couchbase.analytics.AnalyticsLink`
        """
        pass


class CouchbaseAnalyticsEncryptionSettings(object):
    """The settings available for setting encryption level on a link.

    :param encryption_level: The level of encryption to apply, defaults to :class:`couchbase.analytics.AnalyticsEncryptionLevel`.NONE
    :type encryption_level: :class:`couchbase.analytics.AnalyticsEncryptionLevel`
    :param certificate: The certificate to use when encryption level is set to full. Must be set if encryption level is set to full.  Defaults to None.
    :type certificate: bytes | bytearray
    :param client_certificate: The client certificate to use when encryption level is set to full. Cannot be used if username and password are also used. Defaults to None
    :type client_certificate: bytes | bytearray
    :param client_key: The client key to use when encryption level is set to full.  Cannot be used if username and password are also used. Defaults to None
    :type client_key: bytes | bytearray
    """

    def __init__(
        self,  # type: "CouchbaseAnalyticsEncryptionSettings"
        encryption_level=None,  # type: AnalyticsEncryptionLevel
        certificate=None,  # type: Union[bytes, bytearray]
        client_certificate=None,  # type: Union[bytes, bytearray]
        client_key=None,  # type: Union[bytes, bytearray]
    ):
        self._encryption_level = encryption_level
        if self._encryption_level is None:
            self._encryption_level = AnalyticsEncryptionLevel.NONE
        self._certificate = certificate
        self._client_certificate = client_certificate
        self._client_key = client_key

    @property
    def encryption_level(self):
        return self._encryption_level

    @encryption_level.setter
    def encryption_level(self, value):
        self._encryption_level = value

    @property
    def certificate(self):
        return self._certificate

    @certificate.setter
    def certificate(self, value):
        self._certificate = value

    @property
    def client_certificate(self):
        return self._client_certificate

    @client_certificate.setter
    def client_certificate(self, value):
        self._client_certificate = value

    @property
    def client_key(self):
        return self._client_key

    @client_key.setter
    def client_key(self, value):
        self._client_key = value

    @classmethod
    def from_server_json(
        cls,  # type: "CouchbaseAnalyticsEncryptionSettings"
        raw_data  # type: dict
    ) -> "CouchbaseAnalyticsEncryptionSettings":

        encryption_settings = CouchbaseAnalyticsEncryptionSettings()
        if raw_data["encryption"] == AnalyticsEncryptionLevel.NONE.value:
            encryption_settings.encryption_level = AnalyticsEncryptionLevel.NONE
        elif raw_data["encryption"] == AnalyticsEncryptionLevel.HALF.value:
            encryption_settings.encryption_level = AnalyticsEncryptionLevel.HALF
        elif raw_data["encryption"] == AnalyticsEncryptionLevel.FULL.value:
            encryption_settings.encryption_level = AnalyticsEncryptionLevel.FULL

        if "certificate" in raw_data and raw_data["certificate"] and raw_data["certificate"].split(
        ):
            encryption_settings.certificate = bytes(
                raw_data["certificate"], "utf-8")

        if "clientCertificate" in raw_data and raw_data["clientCertificate"] and raw_data["clientCertificate"].split(
        ):
            encryption_settings.certificate = bytes(
                raw_data["clientCertificate"], "utf-8")

        return encryption_settings


def is_null_or_empty(
    value  # type: str
) -> bool:
    return not (value and value.split())


class CouchbaseRemoteAnalyticsLink(AnalyticsLink):
    def __init__(
            self,  # type: "CouchbaseRemoteAnalyticsLink"
            dataverse,  # type: str
            link_name,  # type: str
            hostname,  # type: str
            encryption,  # type: CouchbaseAnalyticsEncryptionSettings
            username=None,  # type: str
            password=None,  # type: str

    ):
        super().__init__()
        self._dataverse = dataverse
        self._link_name = link_name
        self._hostname = hostname
        self._encryption = encryption
        self._username = username
        self._password = password

    def name(
        self,  # type: "CouchbaseRemoteAnalyticsLink"
    ) -> str:
        return self._link_name

    def dataverse_name(
        self,  # type: "CouchbaseRemoteAnalyticsLink"
    ) -> str:
        return self._dataverse

    def form_encode(self: "CouchbaseRemoteAnalyticsLink") -> bytes:
        params = {}

        if "/" not in self._dataverse:
            params["dataverse"] = self._dataverse
            params["name"] = self._link_name

        params["hostname"] = self._hostname
        params["type"] = AnalyticsLinkType.CouchbaseRemote.value
        params["encryption"] = self._encryption.encryption_level.value

        if not is_null_or_empty(self._username):
            params["username"] = self._username

        if not is_null_or_empty(self._password):
            params["password"] = self._password

        if self._encryption.certificate and len(
                self._encryption.certificate) > 0:
            params["certificate"] = self._encryption.certificate.decode(
                "utf-8")

        if self._encryption.client_certificate and len(
                self._encryption.client_certificate) > 0:
            params["clientCertificate"] = self._encryption.client_certificate.decode(
                "utf-8")

        if self._encryption.client_key and len(
                self._encryption.client_key) > 0:
            params["clientKey"] = self._encryption.client_key.decode("utf-8")

        return mk_formstr(params).encode()

    def validate(self: "CouchbaseRemoteAnalyticsLink"):
        if is_null_or_empty(self._dataverse):
            raise InvalidArgumentException(
                "Dataverse must be set for couchbase analytics links.")

        if is_null_or_empty(self._link_name):
            raise InvalidArgumentException(
                "Link name must be set for couchbase analytics links.")

        if is_null_or_empty(self._hostname):
            raise InvalidArgumentException(
                "Hostname must be set for couchbase analytics links.")

        if self._encryption.encryption_level in [
                AnalyticsEncryptionLevel.NONE, AnalyticsEncryptionLevel.HALF]:
            if is_null_or_empty(
                    self._username) or is_null_or_empty(self._password):
                raise InvalidArgumentException(
                    "When encryption level is half or none, username and password must be set for couchbase analytics links.")
        elif self._encryption.encryption_level == AnalyticsEncryptionLevel.FULL:
            if not (self._encryption.certificate and len(
                    self._encryption.certificate) > 0):
                raise InvalidArgumentException(
                    "When encryption level is full a certificate must be set for couchbase analytics links.")
            if not ((self._encryption.client_certificate and len(self._encryption.client_certificate) > 0)
                    and (self._encryption.client_key and len(self._encryption.client_key) > 0)):
                raise InvalidArgumentException(
                    "When encryption level is full the client certificate and key must be set for couchbase analytics links.")

    def link_type(self: "CouchbaseRemoteAnalyticsLink") -> AnalyticsLinkType:
        return AnalyticsLinkType.CouchbaseRemote

    @classmethod
    def link_from_server_json(
        cls,  # type: "CouchbaseRemoteAnalyticsLink"
        raw_data,  # type: dict
    ) -> "CouchbaseRemoteAnalyticsLink":

        dataverse = raw_data["dataverse"] if "dataverse" in raw_data else raw_data["scope"]
        link_name = raw_data["name"]
        hostname = raw_data["activeHostname"]
        encryption = CouchbaseAnalyticsEncryptionSettings.from_server_json(
            raw_data)
        username = raw_data["username"]

        return CouchbaseRemoteAnalyticsLink(
            dataverse, link_name, hostname, encryption, username)


class S3ExternalAnalyticsLink(AnalyticsLink):
    def __init__(
            self,  # type: "S3ExternalAnalyticsLink"
            dataverse,  # type: str
            link_name,  # type: str
            access_key_id,  # type: str
            region,  # type: str
            secret_access_key=None,  # type: str
            session_token=None,  # type: str
            service_endpoint=None,  # type: str

    ):
        super().__init__()
        self._dataverse = dataverse
        self._link_name = link_name
        self._access_key_id = access_key_id
        self._region = region
        self._secret_access_key = secret_access_key
        self._session_token = session_token
        self._service_endpoint = service_endpoint

    def name(
        self,  # type: "S3ExternalAnalyticsLink"
    ) -> str:
        return self._link_name

    def dataverse_name(
        self,  # type: "S3ExternalAnalyticsLink"
    ) -> str:
        return self._dataverse

    def form_encode(self: "S3ExternalAnalyticsLink") -> bytes:
        params = {}

        if "/" not in self._dataverse:
            params["dataverse"] = self._dataverse
            params["name"] = self._link_name

        params["type"] = AnalyticsLinkType.S3External.value
        params["accessKeyId"] = self._access_key_id
        params["secretAccessKey"] = self._secret_access_key
        params["region"] = self._region

        if not is_null_or_empty(self._session_token):
            params["sessionToken"] = self._session_token

        if not is_null_or_empty(self._service_endpoint):
            params["serviceEndpoint"] = self._service_endpoint

        return mk_formstr(params).encode()

    def validate(self: "S3ExternalAnalyticsLink"):
        if is_null_or_empty(self._dataverse):
            raise InvalidArgumentException(
                "Dataverse must be set for S3 external analytics links.")

        if is_null_or_empty(self._link_name):
            raise InvalidArgumentException(
                "Link name must be set for S3 external analytics links.")

        if is_null_or_empty(self._access_key_id):
            raise InvalidArgumentException(
                "Access key id must be set for S3 external analytics links.")

        if is_null_or_empty(self._secret_access_key):
            raise InvalidArgumentException(
                "Secret access key must be set for S3 external analytics links.")

        if is_null_or_empty(self._region):
            raise InvalidArgumentException(
                "Region must be set for S3 external analytics links.")

    def link_type(self: "S3ExternalAnalyticsLink") -> AnalyticsLinkType:
        return AnalyticsLinkType.S3External

    @classmethod
    def link_from_server_json(
        cls,  # type: "S3ExternalAnalyticsLink"
        raw_data,  # type: dict
    ) -> "S3ExternalAnalyticsLink":

        dataverse = raw_data["dataverse"] if "dataverse" in raw_data else raw_data["scope"]
        link_name = raw_data["name"]
        access_key_id = raw_data["accessKeyId"]
        region = raw_data["region"]
        service_endpoint = raw_data["serviceEndpoint"]

        return S3ExternalAnalyticsLink(
            dataverse, link_name, access_key_id, region, service_endpoint=service_endpoint)


class AzureBlobExternalAnalyticsLink(AnalyticsLink):
    def __init__(
            self,  # type: "AzureBlobExternalAnalyticsLink"
            dataverse,  # type: str
            link_name,  # type: str
            connection_string=None,  # type: str
            account_name=None,  # type: str
            account_key=None,  # type: str
            shared_access_signature=None,  # type: str
            blob_endpoint=None,  # type: str
            endpiont_suffix=None,  # type: str

    ):
        super().__init__()
        self._dataverse = dataverse
        self._link_name = link_name
        self._connection_string = connection_string
        self._account_name = account_name
        self._account_key = account_key
        self._shared_access_signature = shared_access_signature
        self._blob_endpoint = blob_endpoint
        self._endpiont_suffix = endpiont_suffix

    def name(
        self,  # type: "AzureBlobExternalAnalyticsLink"
    ) -> str:
        return self._link_name

    def dataverse_name(
        self,  # type: "AzureBlobExternalAnalyticsLink"
    ) -> str:
        return self._dataverse

    def form_encode(self: "AzureBlobExternalAnalyticsLink") -> bytes:
        params = {}

        if "/" not in self._dataverse:
            params["dataverse"] = self._dataverse
            params["name"] = self._link_name

        params["type"] = AnalyticsLinkType.AzureBlobExternal.value

        if not is_null_or_empty(self._connection_string):
            params["connectionString"] = self._connection_string

        if not is_null_or_empty(self._account_name):
            params["accountName"] = self._account_name

        if not is_null_or_empty(self._account_key):
            params["accountKey"] = self._account_key

        if not is_null_or_empty(self._shared_access_signature):
            params["sharedAccessSignature"] = self._shared_access_signature

        if not is_null_or_empty(self._blob_endpoint):
            params["blobEndpoint"] = self._blob_endpoint

        if not is_null_or_empty(self._endpiont_suffix):
            params["endpointSuffix"] = self._endpiont_suffix

        return mk_formstr(params).encode()

    def validate(self: "AzureBlobExternalAnalyticsLink"):
        if is_null_or_empty(self._dataverse):
            raise InvalidArgumentException(
                "Dataverse must be set for Azure blob external analytics links.")

        if is_null_or_empty(self._link_name):
            raise InvalidArgumentException(
                "Link name must be set for Azure blob external analytics links.")

        if is_null_or_empty(self._connection_string):
            acct_name_and_key = not (is_null_or_empty(self._account_name)
                                     or is_null_or_empty(self._account_key))
            acct_name_and_sas = not (is_null_or_empty(self._account_name)
                                     or is_null_or_empty(self._shared_access_signature))

            if not (acct_name_and_key or acct_name_and_sas):
                raise InvalidArgumentException(
                    "AccessKeyId must be set for Azure blob external analytics links.")

    def link_type(self: "AzureBlobExternalAnalyticsLink") -> AnalyticsLinkType:
        return AnalyticsLinkType.AzureBlobExternal

    @classmethod
    def link_from_server_json(
        cls,  # type: "AzureBlobExternalAnalyticsLink"
        raw_data,  # type: dict
    ) -> "AzureBlobExternalAnalyticsLink":

        dataverse = raw_data["dataverse"] if "dataverse" in raw_data else raw_data["scope"]
        link_name = raw_data["name"]
        account_name = raw_data["accountName"]
        blob_endpoint = raw_data["blobEndpoint"]
        endpoint_suffix = raw_data["endpointSuffix"]

        return AzureBlobExternalAnalyticsLink(dataverse,
                                              link_name,
                                              account_name=account_name,
                                              blob_endpoint=blob_endpoint,
                                              endpiont_suffix=endpoint_suffix)


[docs]class AnalyticsResult(iterable_wrapper(AnalyticsRequest)):
[docs] def __init__(self, *args, **kwargs # type: N1QLRequest ): super(AnalyticsResult, self).__init__(*args, **kwargs)
def metadata(self # type: AnalyticsResult ): # type: (...) -> AnalyticsMetaData return AnalyticsMetaData(self)
class AnalyticsScanConsistency(enum.Enum): NOT_BOUNDED = "not_bounded" REQUEST_PLUS = "request_plus" class AnalyticsOptions(QueryBaseOptions): VALID_OPTS = {'timeout': {'timeout': timedelta.seconds}, 'read_only': {'readonly': identity}, 'scan_consistency': {'consistency': enum_value}, 'client_context_id': {'client_context_id': identity}, 'priority': {'priority': identity}, 'positional_parameters': {}, 'named_parameters': {}, 'query_context': {'query_context': identity}, 'raw': {}} TARGET_CLASS = AnalyticsQuery @overload def __init__(self, timeout=None, # type: timedelta read_only=None, # type: bool scan_consistency=None, # type: AnalyticsScanConsistency client_context_id=None, # type: str priority=None, # type: bool positional_parameters=None, # type: Iterable[str] named_parameters=None, # type: Dict[str, str] query_context=None, # type: str raw=None, # type: Dict[str,Any] ): """ :param timedelta timeout: :param bool read_only: :param AnalyticsScanConsistency scan_consistency: :param str client_context_id: :param bool priority: :param Iterable[JSON] positional_parameters: :param dict[str,JSON] named_parameters: :param str query_context: :param dict[str,JSON] raw: """ pass def __init__(self, **kwargs ): super(AnalyticsOptions, self).__init__(**kwargs) class AnalyticsStatus(enum.Enum): RUNNING = () SUCCESS = () ERRORS = () COMPLETED = () STOPPED = () TIMEOUT = () CLOSED = () FATAL = () ABORTED = () UNKNOWN = () class AnalyticsWarning(object): def __init__(self, raw_warning): self._raw_warning = raw_warning def code(self): # type: (...) -> int return self._raw_warning.get('code') def message(self): # type: (...) -> str return self._raw_warning.get('msg') class AnalyticsMetrics(object): def __init__(self, parent # type: AnalyticsResult ): self._parentquery = parent @property def _raw_metrics(self): return self._parentquery.metrics def _as_timedelta(self, time_str): return from_str(self._raw_metrics.get(time_str)) def elapsed_time(self): # type: (...) -> timedelta return self._as_timedelta('elapsedTime') def execution_time(self): # type: (...) -> timedelta return self._as_timedelta('executionTime') def result_count(self): # type: (...) -> UnsignedInt64 return UnsignedInt64(self._raw_metrics.get('resultCount', 0)) def result_size(self): # type: (...) -> UnsignedInt64 return UnsignedInt64(self._raw_metrics.get('resultSize', 0)) def error_count(self): # type: (...) -> UnsignedInt64 return UnsignedInt64(self._raw_metrics.get('errorCount', 0)) def processed_objects(self): # type: (...) -> UnsignedInt64 return UnsignedInt64(self._raw_metrics.get('processedObjects', 0)) def warning_count(self): # type: (...) -> UnsignedInt64 return UnsignedInt64(self._raw_metrics.get('warningCount', 0)) class AnalyticsMetaData(object): def __init__(self, parent # type: AnalyticsResult ): self._parentquery_for_metadata = parent def request_id(self): # type: (...) -> str return self._parentquery_for_metadata.meta.get('requestID') def client_context_id(self): # type: (...) -> str return self._parentquery_for_metadata.meta.get('clientContextID') def signature(self): # type: (...) -> Optional[JSON] return self._parentquery_for_metadata.meta.get('signature') def status(self): # type: (...) -> AnalyticsStatus return AnalyticsStatus[self._parentquery_for_metadata.meta.get( 'status').upper()] def warnings(self): # type: (...) -> List[AnalyticsWarning] return list( map(AnalyticsWarning, self._parentquery_for_metadata.meta.get('warnings', []))) def metrics(self): # type: (...) -> Optional[AnalyticsMetrics] return AnalyticsMetrics(self._parentquery_for_metadata)