import json
import time
import attr
from attr.validators import instance_of as io, optional
from typing import *
import couchbase_core._libcouchbase as LCB
from couchbase_core import mk_formstr
from couchbase.options import OptionBlock, OptionBlockTimeOut, forward_args, timedelta
from couchbase.management.admin import METHMAP
from couchbase.management.generic import GenericManager
from couchbase.exceptions import (ErrorMapper, HTTPException, QueryIndexAlreadyExistsException,
WatchQueryIndexTimeoutException, QueryIndexNotFoundException,
InvalidArgumentException)
try:
from typing import Protocol
except BaseException:
from typing_extensions import Protocol
class QueryErrorMapper(ErrorMapper):
@staticmethod
def mapping():
# type: (...) -> Dict[CBErrorType,Dict[Any, CBErrorType]]
return {HTTPException: {".*[iI]ndex.*already exists.*": QueryIndexAlreadyExistsException,
".*[iI]ndex.*[nN]ot [fF]ound.*": QueryIndexNotFoundException}}
def is_null_or_empty(
value # type: str
) -> bool:
return not (value and value.split())
[docs]@QueryErrorMapper.wrap
class QueryIndexManager(GenericManager):
def __init__(self, parent_cluster):
"""
Query Index Manager
The Query Index Manager interface contains the means for managing indexes used for queries.
:param parent_cluster: Parent cluster
"""
super(QueryIndexManager, self).__init__(parent_cluster)
def _http_request(self, **kwargs):
# the kwargs can override the defaults
imeth = None
method = kwargs.get('method', 'GET')
if not method in METHMAP:
raise InvalidArgumentException("Unknown HTTP Method", method)
imeth = METHMAP[method]
return self._admin_bucket._http_request(
type=LCB.LCB_HTTP_TYPE_QUERY,
path=kwargs['path'],
method=imeth,
content_type=kwargs.get('content_type', 'application/json'),
post_data=kwargs.get('content', None),
response_format=LCB.FMT_JSON,
timeout=kwargs.get('timeout', None))
def _validate_scope_and_collection(self, # type: "QueryIndexManager"
scope=None, # type: str
collection=None # type: str
) -> bool:
if not (scope and scope.split()) and (collection and collection.split()):
raise InvalidArgumentException(
"Both scope and collection must be set. Invalid scope.")
if (scope and scope.split()) and not (collection and collection.split()):
raise InvalidArgumentException(
"Both scope and collection must be set. Invalid collection.")
def _build_keyspace(self, # type: "QueryIndexManager"
bucket, # type: str
scope=None, # type: str
collection=None # type: str
) -> str:
# None AND empty check done in validation, only check for None
if scope and collection:
return "`{}`.`{}`.`{}`".format(bucket, scope, collection)
if scope:
return "`{}`.`{}`".format(bucket, scope)
return "`{}`".format(bucket)
def _create_index(self, bucket_name, fields,
index_name=None, **kwargs):
scope_name = kwargs.get("scope_name", None)
collection_name = kwargs.get("collection_name", None)
self._validate_scope_and_collection(scope_name, collection_name)
primary = kwargs.get("primary", False)
condition = kwargs.get("condition", None)
if primary and fields:
raise TypeError('Cannot create primary index with explicit fields')
elif not primary and not fields:
raise ValueError('Fields required for non-primary index')
if condition and primary:
raise ValueError('cannot specify condition for primary index')
query_str = ""
if not fields:
query_str += "CREATE PRIMARY INDEX"
else:
query_str += "CREATE INDEX"
if index_name and index_name.split():
query_str += " `{}` ".format(index_name)
query_str += " ON {} ".format(self._build_keyspace(
bucket_name, scope_name, collection_name))
if fields:
field_names = ["`{}`".format(f) for f in fields]
query_str += "({})".format(", ".join(field_names))
if condition:
query_str += " WHERE {}".format(condition)
options = {}
deferred = kwargs.get("deferred", False)
if deferred:
options["defer_build"] = deferred
num_replicas = kwargs.get("num_replicas", None)
if num_replicas:
options["num_replica"] = num_replicas
if options:
query_str += " WITH {{{}}}".format(
", ".join(["'{0}':{1}".format(k, v) for k, v in options.items()]))
def possibly_raise(error):
if isinstance(error, list) and "msg" in error[0] and "already exists" in error[0]["msg"]:
if not kwargs.get('ignore_if_exists', False):
raise
try:
resp = self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**kwargs
).value
if "errors" in resp and possibly_raise(resp["errors"]):
msg = resp["errors"][0].get("msg", "Index already exists")
raise QueryIndexAlreadyExistsException.pyexc(
msg, resp["errors"])
except HTTPException as h:
error = getattr(
getattr(
h,
'objextra',
None),
'value',
{}).get(
'errors',
"")
if possibly_raise(error):
raise
def _drop_index(self, bucket_name, index_name=None, **kwargs):
scope_name = kwargs.get("scope_name", None)
collection_name = kwargs.get("collection_name", None)
self._validate_scope_and_collection(scope_name, collection_name)
# previous ignore_missing was a viable kwarg - should only have ignore_if_not_exists
ignore_missing = kwargs.pop("ignore_missing", None)
if ignore_missing:
kwargs["ignore_if_not_exists"] = ignore_missing
query_str = ""
keyspace = self._build_keyspace(
bucket_name, scope_name, collection_name)
if not index_name:
query_str += "DROP PRIMARY INDEX ON {}".format(keyspace)
else:
if scope_name and collection_name:
query_str += "DROP INDEX `{0}` ON {1}".format(
index_name, keyspace)
else:
query_str += "DROP INDEX {0}.`{1}`".format(
keyspace, index_name)
def possibly_raise(error):
if isinstance(error, list) and "msg" in error[0] and "not found" in error[0]["msg"]:
if not kwargs.get('ignore_if_not_exists', False):
return True
try:
resp = self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**kwargs
).value
if "errors" in resp and possibly_raise(resp["errors"]):
msg = resp["errors"][0].get("msg", "Index not found")
raise QueryIndexNotFoundException.pyexc(msg, resp["errors"])
except HTTPException as h:
error = getattr(
getattr(
h,
'objextra',
None),
'value',
{}).get(
'errors',
"")
if possibly_raise(error):
raise
[docs] def get_all_indexes(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "GetAllQueryIndexOptions"
**kwargs # type: Any
):
# type: (...) -> List[QueryIndex]
"""
Fetches all indexes from the server.
:param str bucket_name: the name of the bucket.
:param GetAllQueryIndexOptions options: Options to use for getting all indexes.
:param Any kwargs: Override corresponding value in options.
:return: A list of QueryIndex objects.
:raises: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
scope_name = final_args.get("scope_name", None)
collection_name = final_args.get("collection_name", None)
if scope_name and collection_name:
query_str = """
SELECT idx.* FROM system:indexes AS idx
WHERE `bucket_id`="{0}" AND `scope_id`="{1}"
AND `keyspace_id`="{2}" AND `using`="gsi"
ORDER BY is_primary DESC, name ASC
""".format(bucket_name, scope_name, collection_name)
elif scope_name:
query_str = """
SELECT idx.* FROM system:indexes AS idx
WHERE `bucket_id`="{0}" AND `scope_id`="{1}" AND `using`="gsi"
ORDER BY is_primary DESC, name ASC
""".format(bucket_name, scope_name)
else:
query_str = """
SELECT idx.* FROM system:indexes AS idx
WHERE (
(`bucket_id` IS MISSING AND `keyspace_id`="{0}")
OR `bucket_id`="{0}"
) AND `using`="gsi"
ORDER BY is_primary DESC, name ASC
""".format(bucket_name)
response = self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**final_args
).value
if response and "results" in response:
results = response.get("results")
res = list(map(QueryIndex.from_server, results))
return res
return []
[docs] def create_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
index_name, # type: str
fields, # type: Iterable[str]
*options, # type: "CreateQueryIndexOptions"
**kwargs
):
# type: (...) -> None
"""
Creates a new index.
:param str bucket_name: name of the bucket.
:param str index_name: the name of the index.
:param Iterable[str] fields: Fields over which to create the index.
:param CreateQueryIndexOptions options: Options to use when creating index.
:param Any kwargs: Override corresponding value in options.
:raises: QueryIndexAlreadyExistsException
:raises: InvalidArgumentsException
"""
# CREATE INDEX index_name ON bucket_name WITH { "num_replica": 2 }
# https://docs.couchbase.com/server/current/n1ql/n1ql-language-reference/createindex.html
#
final_args = forward_args(kwargs, *options)
self._create_index(bucket_name, fields, index_name, **final_args)
[docs] def create_primary_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "CreatePrimaryQueryIndexOptions"
**kwargs
):
"""
Creates a new primary index.
:param str bucket_name: name of the bucket.
:param str index_name: name of the index.
:param CreatePrimaryQueryIndexOptions options: Options to use when creating primary index
:param Any kwargs: Override corresponding values in options.
:raises: QueryIndexAlreadyExistsException
:raises: InvalidArgumentsException
"""
# CREATE INDEX index_name ON bucket_name WITH { "num_replica": 2 }
# https://docs.couchbase.com/server/current/n1ql/n1ql-language-reference/createindex.html
#
kwargs['primary'] = True
final_args = forward_args(kwargs, *options)
index_name = final_args.pop("index_name", None)
self._create_index(bucket_name, [], index_name, **final_args)
[docs] def drop_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
index_name, # type: str
*options, # type: "DropQueryIndexOptions"
**kwargs):
"""
Drops an index.
:param str bucket_name: name of the bucket.
:param str index_name: name of the index.
:param DropQueryIndexOptions options: Options for dropping index.
:param Any kwargs: Override corresponding value in options.
:raises: QueryIndexNotFoundException
:raises: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
self._drop_index(bucket_name, index_name, **final_args)
[docs] def drop_primary_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "DropPrimaryQueryIndexOptions"
**kwargs):
"""
Drops a primary index.
:param bucket_name: name of the bucket.
:param index_name: name of the index.
:param ignore_if_not_exists: Don't error/throw if the index does not exist.
:param timeout: the time allowed for the operation to be terminated. This is controlled by the client.
:raises: QueryIndexNotFoundException
:raises: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
index_name = final_args.pop("index_name", None)
self._drop_index(bucket_name, index_name, **final_args)
[docs] def watch_indexes(self, # type: "QueryIndexManager"
bucket_name, # type: str
index_names, # type: Iterable[str]
*options, # type: "WatchQueryIndexOptions"
**kwargs):
"""
Watch polls indexes until they are online.
:param str bucket_name: name of the bucket.
:param Iterable[str] index_names: name(s) of the index(es).
:param WatchQueryIndexOptions options: Options for request to watch indexes.
:param Any kwargs: Override corresponding valud in options.
:raises: QueryIndexNotFoundException
:raises: WatchQueryIndexTimeoutException
"""
final_args = forward_args(kwargs, *options)
scope_name = final_args.get("scope_name", None)
collection_name = final_args.get("collection_name", None)
self._validate_scope_and_collection(scope_name, collection_name)
if final_args.get("watch_primary", False):
index_names.append("#primary")
timeout = final_args.get("timeout", None)
if not timeout:
raise ValueError(
'Must specify a timeout condition for watch indexes')
def check_indexes(index_names, indexes):
for idx_name in index_names:
match = next((i for i in indexes if i.name == idx_name), None)
if not match:
raise QueryIndexNotFoundException(
"Cannot find index with name: {}".format(idx_name))
return all(map(lambda i: i.state == "online", indexes))
# timeout is converted to microsecs via final_args()
timeout_millis = timeout / 1000
interval_millis = float(50)
start = time.perf_counter()
time_left = timeout_millis
while True:
opts = GetAllQueryIndexOptions(
timeout=timedelta(milliseconds=time_left))
if scope_name:
opts["scope_name"] = scope_name
opts["collection_name"] = collection_name
indexes = self.get_all_indexes(bucket_name, opts)
all_online = check_indexes(index_names, indexes)
if all_online:
break
interval_millis += 500
if interval_millis > 1000:
interval_millis = 1000
time_left = timeout_millis - ((time.perf_counter() - start) * 1000)
if interval_millis > time_left:
interval_millis = time_left
if time_left <= 0:
raise WatchQueryIndexTimeoutException(
"Failed to find all indexes online within the alloted time.")
time.sleep(interval_millis / 1000)
def _build_deferred_prior_6_5(self, bucket_name, **final_args):
"""
** INTERNAL **
"""
indexes = self.get_all_indexes(bucket_name, GetAllQueryIndexOptions(
timeout=final_args.get("timeout", None)))
deferred_indexes = [
idx.name for idx in indexes if idx.state in ["deferred", "pending"]]
query_str = "BUILD INDEX ON `{}` ({})".format(
bucket_name, ", ".join(["`{}`".format(di) for di in deferred_indexes]))
self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**final_args
)
def _build_deferred_6_5_plus(self, bucket_name, **final_args):
"""
** INTERNAL **
"""
scope_name = final_args.get("scope_name", None)
collection_name = final_args.get("collection_name", None)
self._validate_scope_and_collection(scope_name, collection_name)
keyspace = self._build_keyspace(
bucket_name, scope_name, collection_name)
if scope_name and collection_name:
inner_query_str = """
SELECT RAW idx.name FROM system:indexes AS idx
WHERE `bucket_id`="{0}" AND `scope_id`="{1}"
AND `keyspace_id`="{2}" AND state="deferred"
""".format(bucket_name, scope_name, collection_name)
else:
inner_query_str = """
SELECT RAW idx.name FROM system:indexes AS idx
WHERE (
(`bucket_id` IS MISSING AND `keyspace_id`="{0}")
OR `bucket_id`="{0}"
) AND state="deferred"
""".format(bucket_name)
query_str = "BUILD INDEX ON {} (({}))".format(
keyspace, inner_query_str)
self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**final_args
)
[docs] def build_deferred_indexes(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "BuildDeferredQueryIndexOptions"
**kwargs
):
"""
Build Deferred builds all indexes which are currently in deferred state.
:param str bucket_name: name of the bucket.
:param BuildDeferredQueryIndexOptions options: Options for building deferred indexes.
:param Any kwargs: Override corresponding value in options.
:raise: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
if self._admin_bucket._is_6_5_plus():
self._build_deferred_6_5_plus(bucket_name, **final_args)
else:
self._build_deferred_prior_6_5(bucket_name, **final_args)
class IndexType(object):
pass
@attr.s
class QueryIndex(Protocol):
"""The QueryIndex protocol provides a means of mapping a query index into an object."""
name = attr.ib(validator=io(str)) # type: str
is_primary = attr.ib(validator=io(bool)) # type: bool
type = attr.ib(validator=io(IndexType), type=IndexType) # type: IndexType
state = attr.ib(validator=io(str)) # type: str
namespace = attr.ib(validator=io(str)) # type: str
keyspace = attr.ib(validator=io(str)) # type: str
index_key = attr.ib(validator=io(Iterable)) # type: Iterable[str]
condition = attr.ib(validator=io(str)) # type: str
bucket_name = attr.ib(validator=optional(io(str))) # type: Optional[str]
scope_name = attr.ib(validator=optional(io(str))) # type: Optional[str]
collection_name = attr.ib(
validator=optional(io(str))) # type: Optional[str]
partition = attr.ib(validator=optional(
validator=io(str))) # type: Optional[str]
@classmethod
def from_server(cls,
json_data # type: Dict[str, Any]
):
return cls(json_data.get("name"),
bool(json_data.get("is_primary")),
IndexType(),
json_data.get("state"),
json_data.get("keyspace_id"),
json_data.get("namespace_id"),
[],
json_data.get("condition", ""),
json_data.get(
"bucket_id", json_data.get("keyspace_id", "")),
json_data.get("scope_id", ""),
json_data.get("keyspace_id", ""),
json_data.get("partition", None)
)
class GetAllQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
timeout=None, # type: timedelta
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Get all query indexes options
:param timeout: operation timeout in seconds
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
super(GetAllQueryIndexOptions, self).__init__(**kwargs)
class CreateQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
timeout=None, # type: timedelta
ignore_if_exists=None, # type: bool
num_replicas=None, # type: int
deferred=None, # type: bool
condition=None, # type: str
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Query Index creation options
:param timeout: operation timeout in seconds
:param ignore_if_exists: don't throw an exception if index already exists
:param num_replicas: number of replicas
:param deferred: whether the index creation should be deferred
:param condition: 'where' condition for partial index creation
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
if 'ignore_if_exists' not in kwargs:
kwargs['ignore_if_exists'] = False
super(CreateQueryIndexOptions, self).__init__(**kwargs)
class CreatePrimaryQueryIndexOptions(CreateQueryIndexOptions):
@overload
def __init__(self,
index_name=None, # type: str
timeout=None, # type: timedelta
ignore_if_exists=None, # type: bool
num_replicas=None, # type: int
deferred=None, # type: bool
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Query Primary Index creation options
:param index_name: name of primary index
:param timeout: operation timeout in seconds
:param ignore_if_exists: don't throw an exception if index already exists
:param num_replicas: number of replicas
:param deferred: whether the index creation should be deferred
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
super(CreatePrimaryQueryIndexOptions, self).__init__(**kwargs)
class DropQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
ignore_if_not_exists=None, # type: bool
timeout=None, # type: timedelta
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Drop query index options
:param ignore_if_exists: don't throw an exception if index already exists
:param timeout: operation timeout in seconds
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
super(DropQueryIndexOptions, self).__init__(**kwargs)
class DropPrimaryQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
index_name=None, # str
ignore_if_not_exists=None, # type: bool
timeout=None, # type: timedelta
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Drop primary index options
:param index_name: name of primary index
:param timeout: operation timeout in seconds
:param ignore_if_exists: don't throw an exception if index already exists
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
super(DropPrimaryQueryIndexOptions, self).__init__(**kwargs)
class WatchQueryIndexOptions(OptionBlock):
@overload
def __init__(self,
watch_primary=None, # type: bool
timeout=None, # type: timedelta
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Watch query index options
:param watch_primary: If True, watch primary indexes
:param timeout: operation timeout in seconds
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
super(WatchQueryIndexOptions, self).__init__(**kwargs)
class BuildDeferredQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
timeout=None, # type: timedelta
scope_name=None, # type: str
collection_name=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Build deferred query indexes options
:param timeout: operation timeout in seconds
:param scope_name:
**UNCOMMITTED**
scope_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Nme of the scope where the index belongs
:param collection_name:
**UNCOMMITTED**
collection_name is an uncommitted API that is unlikely to change,
but may still change as final consensus on its behavior has not yet been reached.
Name of the collection where the index belongs
"""
super(BuildDeferredQueryIndexOptions, self).__init__(**kwargs)