import time
import attr
from attr.validators import instance_of as io, optional
from typing import *
import couchbase_core._libcouchbase as LCB
from couchbase_core import mk_formstr
from couchbase.options import OptionBlock, OptionBlockTimeOut, forward_args, timedelta
from couchbase.management.admin import METHMAP
from couchbase.management.generic import GenericManager
from couchbase.exceptions import (ErrorMapper, HTTPException, QueryIndexAlreadyExistsException,
WatchQueryIndexTimeoutException, QueryIndexNotFoundException,
InvalidArgumentException)
try:
from typing import Protocol
except BaseException:
from typing_extensions import Protocol
class QueryErrorMapper(ErrorMapper):
@staticmethod
def mapping():
# type: (...) -> Dict[CBErrorType,Dict[Any, CBErrorType]]
return {HTTPException: {".*[iI]ndex.*already exists.*": QueryIndexAlreadyExistsException,
".*[iI]ndex.*[nN]ot [fF]ound.*": QueryIndexNotFoundException}}
[docs]@QueryErrorMapper.wrap
class QueryIndexManager(GenericManager):
def __init__(self, parent_cluster):
"""
Query Index Manager
The Query Index Manager interface contains the means for managing indexes used for queries.
:param parent_cluster: Parent cluster
"""
super(QueryIndexManager, self).__init__(parent_cluster)
def _http_request(self, **kwargs):
# the kwargs can override the defaults
imeth = None
method = kwargs.get('method', 'GET')
if not method in METHMAP:
raise InvalidArgumentException("Unknown HTTP Method", method)
imeth = METHMAP[method]
return self._admin_bucket._http_request(
type=LCB.LCB_HTTP_TYPE_QUERY,
path=kwargs['path'],
method=imeth,
content_type=kwargs.get('content_type', 'application/json'),
post_data=kwargs.get('content', None),
response_format=LCB.FMT_JSON,
timeout=kwargs.get('timeout', None))
def _create_index(self, bucket_name, fields,
index_name=None, **kwargs):
primary = kwargs.get("primary", False)
condition = kwargs.get("condition", None)
if primary and fields:
raise TypeError('Cannot create primary index with explicit fields')
elif not primary and not fields:
raise ValueError('Fields required for non-primary index')
if condition and primary:
raise ValueError('cannot specify condition for primary index')
query_str = ""
if not fields:
query_str += "CREATE PRIMARY INDEX"
else:
query_str += "CREATE INDEX"
if index_name and index_name.split():
query_str += " `{}` ".format(index_name)
query_str += " ON `{}` ".format(bucket_name)
if fields:
field_names = ["`{}`".format(f) for f in fields]
query_str += "({})".format(", ".join(field_names))
if condition:
query_str += " WHERE {}".format(condition)
options = {}
deferred = kwargs.get("deferred", False)
if deferred:
options["defer_build"] = deferred
num_replicas = kwargs.get("num_replicas", None)
if num_replicas:
options["num_replica"] = num_replicas
if options:
query_str += " WITH {{{}}}".format(
", ".join(["'{0}':{1}".format(k, v) for k, v in options.items()]))
def possibly_raise(error):
if isinstance(error, list) and "msg" in error[0] and "already exists" in error[0]["msg"]:
if not kwargs.get('ignore_if_exists', False):
raise
try:
resp = self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**kwargs
).value
if "errors" in resp and possibly_raise(resp["errors"]):
msg = resp["errors"][0].get("msg", "Index already exists")
raise QueryIndexAlreadyExistsException.pyexc(
msg, resp["errors"])
except HTTPException as h:
error = getattr(
getattr(
h,
'objextra',
None),
'value',
{}).get(
'errors',
"")
if possibly_raise(error):
raise
def _drop_index(self, bucket_name, index_name=None, **kwargs):
# previous ignore_missing was a viable kwarg - should only have ignore_if_not_exists
ignore_missing = kwargs.pop("ignore_missing", None)
if ignore_missing:
kwargs["ignore_if_not_exists"] = ignore_missing
query_str = ""
if not index_name:
query_str += "DROP PRIMARY INDEX ON `{}`".format(bucket_name)
else:
query_str += "DROP INDEX `{0}`.`{1}`".format(
bucket_name, index_name)
def possibly_raise(error):
if isinstance(error, list) and "msg" in error[0] and "not found" in error[0]["msg"]:
if not kwargs.get('ignore_if_not_exists', False):
return True
try:
resp = self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**kwargs
).value
if "errors" in resp and possibly_raise(resp["errors"]):
msg = resp["errors"][0].get("msg", "Index not found")
raise QueryIndexNotFoundException.pyexc(msg, resp["errors"])
except HTTPException as h:
error = getattr(
getattr(
h,
'objextra',
None),
'value',
{}).get(
'errors',
"")
if possibly_raise(error):
raise
[docs] def get_all_indexes(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "GetAllQueryIndexOptions"
**kwargs # type: Any
):
# type: (...) -> List[QueryIndex]
"""
Fetches all indexes from the server.
:param str bucket_name: the name of the bucket.
:param GetAllQueryIndexOptions options: Options to use for getting all indexes.
:param Any kwargs: Override corresponding value in options.
:return: A list of QueryIndex objects.
:raises: InvalidArgumentsException
"""
query_str = """
SELECT idx.* FROM system:indexes AS idx
WHERE (
(`bucket_id` IS MISSING AND `keyspace_id`="{0}")
OR `bucket_id`="{0}"
) AND `using`="gsi"
ORDER BY is_primary DESC, name ASC
""".format(bucket_name)
response = self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**forward_args(kwargs, *options)
).value
if response and "results" in response:
results = response.get("results")
res = list(map(QueryIndex.from_server, results))
return res
return []
[docs] def create_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
index_name, # type: str
fields, # type: Iterable[str]
*options, # type: "CreateQueryIndexOptions"
**kwargs
):
# type: (...) -> None
"""
Creates a new index.
:param str bucket_name: name of the bucket.
:param str index_name: the name of the index.
:param Iterable[str] fields: Fields over which to create the index.
:param CreateQueryIndexOptions options: Options to use when creating index.
:param Any kwargs: Override corresponding value in options.
:raises: QueryIndexAlreadyExistsException
:raises: InvalidArgumentsException
"""
# CREATE INDEX index_name ON bucket_name WITH { "num_replica": 2 }
# https://docs.couchbase.com/server/current/n1ql/n1ql-language-reference/createindex.html
#
final_args = forward_args(kwargs, *options)
self._create_index(bucket_name, fields, index_name, **final_args)
[docs] def create_primary_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "CreatePrimaryQueryIndexOptions"
**kwargs
):
"""
Creates a new primary index.
:param str bucket_name: name of the bucket.
:param str index_name: name of the index.
:param CreatePrimaryQueryIndexOptions options: Options to use when creating primary index
:param Any kwargs: Override corresponding values in options.
:raises: QueryIndexAlreadyExistsException
:raises: InvalidArgumentsException
"""
# CREATE INDEX index_name ON bucket_name WITH { "num_replica": 2 }
# https://docs.couchbase.com/server/current/n1ql/n1ql-language-reference/createindex.html
#
kwargs['primary'] = True
final_args = forward_args(kwargs, *options)
index_name = final_args.pop("index_name", None)
self._create_index(bucket_name, [], index_name, *options, **final_args)
[docs] def drop_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
index_name, # type: str
*options, # type: "DropQueryIndexOptions"
**kwargs):
"""
Drops an index.
:param str bucket_name: name of the bucket.
:param str index_name: name of the index.
:param DropQueryIndexOptions options: Options for dropping index.
:param Any kwargs: Override corresponding value in options.
:raises: QueryIndexNotFoundException
:raises: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
self._drop_index(bucket_name, index_name, **final_args)
[docs] def drop_primary_index(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "DropPrimaryQueryIndexOptions"
**kwargs):
"""
Drops a primary index.
:param bucket_name: name of the bucket.
:param index_name: name of the index.
:param ignore_if_not_exists: Don't error/throw if the index does not exist.
:param timeout: the time allowed for the operation to be terminated. This is controlled by the client.
:raises: QueryIndexNotFoundException
:raises: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
index_name = final_args.pop("index_name", None)
self._drop_index(bucket_name, index_name, **final_args)
[docs] def watch_indexes(self, # type: "QueryIndexManager"
bucket_name, # type: str
index_names, # type: Iterable[str]
*options, # type: "WatchQueryIndexOptions"
**kwargs):
"""
Watch polls indexes until they are online.
:param str bucket_name: name of the bucket.
:param Iterable[str] index_names: name(s) of the index(es).
:param WatchQueryIndexOptions options: Options for request to watch indexes.
:param Any kwargs: Override corresponding valud in options.
:raises: QueryIndexNotFoundException
:raises: WatchQueryIndexTimeoutException
"""
final_args = forward_args(kwargs, *options)
if final_args.get("watch_primary", False):
index_names.append("#primary")
timeout = final_args.get("timeout", None)
if not timeout:
raise ValueError(
'Must specify a timeout condition for watch indexes')
def check_indexes(index_names, indexes):
for idx_name in index_names:
match = next((i for i in indexes if i.name == idx_name), None)
if not match:
raise QueryIndexNotFoundException(
"Cannot find index with name: {}".format(idx_name))
return all(map(lambda i: i.state == "online", indexes))
# timeout is converted to microsecs via final_args()
timeout_millis = timeout / 1000
interval_millis = float(50)
start = time.perf_counter()
time_left = timeout_millis
while True:
indexes = self.get_all_indexes(bucket_name, GetAllQueryIndexOptions(
timeout=timedelta(milliseconds=time_left)))
all_online = check_indexes(index_names, indexes)
if all_online:
break
interval_millis += 500
if interval_millis > 1000:
interval_millis = 1000
time_left = timeout_millis - ((time.perf_counter() - start) * 1000)
if interval_millis > time_left:
interval_millis = time_left
if time_left <= 0:
raise WatchQueryIndexTimeoutException(
"Failed to find all indexes online within the alloted time.")
time.sleep(interval_millis / 1000)
[docs] def build_deferred_indexes(self, # type: "QueryIndexManager"
bucket_name, # type: str
*options, # type: "BuildDeferredQueryIndexOptions"
**kwargs
):
"""
Build Deferred builds all indexes which are currently in deferred state.
:param str bucket_name: name of the bucket.
:param BuildDeferredQueryIndexOptions options: Options for building deferred indexes.
:param Any kwargs: Override corresponding value in options.
:raise: InvalidArgumentsException
"""
final_args = forward_args(kwargs, *options)
indexes = self.get_all_indexes(bucket_name, GetAllQueryIndexOptions(
timeout=final_args.get("timeout", None)))
deferred_indexes = [
idx.name for idx in indexes if idx.state in ["deferred", "pending"]]
query_str = "BUILD INDEX ON `{}` ({})".format(
bucket_name, ", ".join(["`{}`".format(di) for di in deferred_indexes]))
self._http_request(
path="",
method="POST",
content=mk_formstr({"statement": query_str}),
content_type='application/x-www-form-urlencoded',
**final_args
)
return deferred_indexes
class IndexType(object):
pass
@attr.s
class QueryIndex(Protocol):
"""The QueryIndex protocol provides a means of mapping a query index into an object."""
name = attr.ib(validator=io(str)) # type: str
is_primary = attr.ib(validator=io(bool)) # type: bool
type = attr.ib(validator=io(IndexType), type=IndexType) # type: IndexType
state = attr.ib(validator=io(str)) # type: str
keyspace = attr.ib(validator=io(str)) # type: str
index_key = attr.ib(validator=io(Iterable)) # type: Iterable[str]
condition = attr.ib(validator=io(str)) # type: str
partition = attr.ib(validator=optional(
validator=io(str))) # type: Optional[str]
@classmethod
def from_server(cls,
json_data # type: Dict[str, Any]
):
return cls(json_data.get("name"),
bool(json_data.get("is_primary")),
IndexType(),
json_data.get("state"),
json_data.get("keyspace_id"),
[],
json_data.get("condition", ""),
json_data.get("partition", None)
)
class GetAllQueryIndexOptions(OptionBlockTimeOut):
pass
class CreateQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
timeout=None, # type: timedelta
ignore_if_exists=None, # type: bool
num_replicas=None, # type: int
deferred=None, # type: bool
condition=None # type: str
):
pass
def __init__(self, **kwargs):
"""
Query Index creation options
:param timeout: operation timeout in seconds
:param ignore_if_exists: don't throw an exception if index already exists
:param num_replicas: number of replicas
:param deferred: whether the index creation should be deferred
:param condition: 'where' condition for partial index creation
"""
if 'ignore_if_exists' not in kwargs:
kwargs['ignore_if_exists'] = False
super(CreateQueryIndexOptions, self).__init__(**kwargs)
class CreatePrimaryQueryIndexOptions(CreateQueryIndexOptions):
@overload
def __init__(self,
index_name=None, # type: str
timeout=None, # type: timedelta
ignore_if_exists=None, # type: bool
num_replicas=None, # type: int
deferred=None # type: bool
):
pass
def __init__(self, **kwargs):
super(CreatePrimaryQueryIndexOptions, self).__init__(**kwargs)
class DropQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
ignore_if_not_exists=None, # type: bool
timeout=None # type: timedelta
):
pass
def __init__(self, **kwargs):
super(DropQueryIndexOptions, self).__init__(**kwargs)
class DropPrimaryQueryIndexOptions(OptionBlockTimeOut):
@overload
def __init__(self,
index_name=None, # str
ignore_if_not_exists=None, # type: bool
timeout=None # type: timedelta
):
pass
def __init__(self, **kwargs):
super(DropPrimaryQueryIndexOptions, self).__init__(**kwargs)
class WatchQueryIndexOptions(OptionBlock):
@overload
def __init__(self,
watch_primary=None # type: bool
):
pass
def __init__(self, **kwargs):
super(WatchQueryIndexOptions, self).__init__(**kwargs)
class BuildDeferredQueryIndexOptions(OptionBlockTimeOut):
pass