#
# Copyright 2015, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import couchbase
from couchbase._pyport import basestring
from couchbase.views.iterator import AlreadyQueriedError
from couchbase.exceptions import CouchbaseError, NotSupportedError
class N1QLError(CouchbaseError):
@property
def n1ql_errcode(self):
return self.objextra['code']
STATEMENT_PLUS = 'statement_plus'
REQUEST_PLUS = 'request_plus'
"""
For use with :attr:`~.N1QLQuery.consistency`, will ensure that query
results always reflect the latest data in the server
"""
UNBOUNDED = 'none'
"""
For use with :attr:`~.N1QLQuery.consistency`, will allow cached
values to be returned. This will improve performance but may not
reflect the latest data in the server.
"""
CONSISTENCY_REQUEST = REQUEST_PLUS
CONSISTENCY_NONE = UNBOUNDED
class MissingTokenError(CouchbaseError):
pass
[docs]class MutationState(object):
"""
.. warning::
The API and implementation of this class are subject to change.
This class acts as a container for one or more mutations. It may
then be used with the :meth:`~.N1QLQuery.consistent_with` method to
indicate that a given query should be bounded by the contained
mutations.
Using `consistent_with` is similar to setting
:attr:`~.N1QLQuery.consistency` to :data:`REQUEST_PLUS`,
but is more optimal as the query will use cached data, *except*
when the given mutation(s) are concerned. This option is useful
for use patterns when an application has just performed a mutation,
and wishes to perform a query in which the newly-performed mutation
should reflect on the query results.
.. note::
This feature requires Couchbase Server 4.5 or greater,
and also requires that `fetch_mutation_tokens=true`
be specified in the connection string when creating
a :class:`~couchbase.bucket.Bucket`
.. code-block:: python
cb = Bucket('couchbase://localhost/default?fetch_mutation_tokens=true')
rvs = cb.upsert_multi({
'foo': {'type': 'user', 'value': 'a foo value'},
'bar': {'type': 'user', 'value': 'a bar value'}
})
nq = N1QLQuery('SELECT type, value FROM default WHERE type="user"')
ms = MutationToken()
ms.add_result(rv
nq.consistent_with_ops(*rvs.values())
for row in cb.n1ql_query(nq):
# ...
"""
def __init__(self, *docs):
self._sv = {}
if docs:
self.add_results(*docs)
def _add_scanvec(self, mutinfo):
"""
Internal method used to specify a scan vector.
:param mutinfo: A tuple in the form of
`(vbucket id, vbucket uuid, mutation sequence)`
"""
vb, uuid, seq, bktname = mutinfo
self._sv.setdefault(bktname, {})[vb] = (seq, str(uuid))
def encode(self):
"""
Encodes this state object to a string. This string may be passed
to the :meth:`decode` at a later time. The returned object is safe
for sending over the network.
:return: A serialized string representing the state
"""
return couchbase._to_json(self._sv)
@classmethod
def decode(cls, s):
"""
Create a :class:`MutationState` from the encoded string
:param s: The encoded string
:return: A new MutationState restored from the string
"""
d = couchbase._from_json(s)
o = MutationState()
o._sv = d
# TODO: Validate
def add_results(self, *rvs, **kwargs):
"""
Changes the state to reflect the mutation which yielded the given
result.
In order to use the result, the `fetch_mutation_tokens` option must
have been specified in the connection string, _and_ the result
must have been successful.
:param rvs: One or more :class:`~.OperationResult` which have been
returned from mutations
:param quiet: Suppress errors if one of the results does not
contain a convertible state.
:return: `True` if the result was valid and added, `False` if not
added (and `quiet` was specified
:raise: :exc:`~.MissingTokenError` if `result` does not contain
a valid token
"""
if not rvs:
raise MissingTokenError.pyexc(message='No results passed')
for rv in rvs:
mi = rv._mutinfo
if not mi:
if kwargs.get('quiet'):
return False
raise MissingTokenError.pyexc(
message='Result does not contain token')
self._add_scanvec(mi)
return True
def add_all(self, bucket, quiet=False):
"""
Ensures the query result is consistent with all prior
mutations performed by a given bucket.
Using this function is equivalent to keeping track of all
mutations performed by the given bucket, and passing them to
:meth:`~add_result`
:param bucket: A :class:`~couchbase.bucket.Bucket` object
used for the mutations
:param quiet: If the bucket contains no valid mutations, this
option suppresses throwing exceptions.
:return: `True` if at least one mutation was added, `False` if none
were added (and `quiet` was specified)
:raise: :exc:`~.MissingTokenError` if no mutations were added and
`quiet` was not specified
"""
added = False
for mt in bucket._mutinfo():
added = True
self._add_scanvec(mt)
if not added and not quiet:
raise MissingTokenError('Bucket object contains no tokens!')
return added
def __repr__(self):
return repr(self._sv)
def __nonzero__(self):
return bool(self._sv)
__bool__ = __nonzero__
[docs]class N1QLQuery(object):
[docs] def __init__(self, query, *args, **kwargs):
"""
Create an N1QL Query object. This may be passed as the
`params` argument to :class:`N1QLRequest`.
:param query: The query string to execute
:param args: Positional placeholder arguments. These satisfy
the placeholder values for positional placeholders in the
query string, such as ``$1``, ``$2`` and so on.
:param kwargs: Named placeholder arguments. These satisfy
named placeholders in the query string, such as
``$name``, ``$email`` and so on. For the placeholder
values, omit the leading sigil (``$``).
Use positional parameters::
q = N1QLQuery('SELECT * FROM `travel-sample` '
'WHERE type=$1 AND id=$2',
'airline', 0)
for row in cb.n1ql_query(q):
print 'Got', row
Use named parameters::
q = N1QLQuery('SELECT * FROM `travel-sample` '
'WHERE type=$type AND id=$id',
type='airline', id=0)
for row in cb.n1ql_query(q):
print 'Got', row
When using placeholders, ensure that the placeholder value is
the *unserialized* (i.e. native) Python value, not the JSON
serialized value. For example the query
``SELECT * FROM products WHERE tags IN ["sale", "clearance"]``
can be rewritten using placeholders:
Correct::
N1QLQuery('SELECT * FROM products WHERE tags IN $1',
['sale', 'clearance'])
Incorrect::
N1QLQuery('SELECT * FROM products WHERE tags IN $1',
"[\\"sale\\",\\"clearance\\"]")
Since the placeholders are serialized to JSON internally anyway.
"""
self._adhoc = True
self._cross_bucket = False
self._body = {'statement': query}
if args:
self._add_pos_args(*args)
if kwargs:
self._set_named_args(**kwargs)
def _set_named_args(self, **kv):
"""
Set a named parameter in the query. The named field must
exist in the query itself.
:param kv: Key-Value pairs representing values within the
query. These values should be stripped of their leading
`$` identifier.
"""
for k in kv:
self._body['${0}'.format(k)] = kv[k]
return self
def _add_pos_args(self, *args):
"""
Set values for *positional* placeholders (``$1,$2,...``)
:param args: Values to be used
"""
arg_array = self._body.setdefault('args', [])
arg_array.extend(args)
[docs] def set_option(self, name, value):
"""
Set a raw option in the query. This option is encoded
as part of the query parameters without any client-side
verification. Use this for settings not directly exposed
by the Python client.
:param name: The name of the option
:param value: The value of the option
"""
self._body[name] = value
@property
def statement(self):
return self._body['statement']
@property
def consistency(self):
"""
Sets the consistency level.
:see: :data:`UNBOUNDED`, :data:`REQUEST_PLUS`
"""
return self._body.get('scan_consistency', UNBOUNDED)
@consistency.setter
def consistency(self, value):
self._body['scan_consistency'] = value
[docs] def consistent_with(self, state):
"""
Indicate that the query should be consistent with one or more
mutations.
:param state: The state of the mutations it should be consistent
with.
"""
if self.consistency not in (UNBOUNDED, 'at_plus'):
raise TypeError(
'consistent_with not valid with other consistency options')
if not state:
raise TypeError('Passed empty or invalid state', state)
self.consistency = 'at_plus'
self._body['scan_vectors'] = state._sv
# TODO: I really wish Sphinx were able to automatically
# document instance vars
@property
def adhoc(self):
"""
A non-`adhoc` query can be internally optimized so that repeated
executions of the same query can be quicker. If this query is issued
repeatedly in your application, then you should set this property to
`False`.
Note that this optimization involves an up-front "preparation"
cost, and should only be used for queries that are issued multiple
times.
"""
return self._adhoc
@adhoc.setter
def adhoc(self, arg):
self._adhoc = arg
@property
def cross_bucket(self):
"""
Set this to true to indicate that the query string involves multiple
buckets. This makes the query a "cluster-level" query. Cluster level
queries have access to documents in multiple buckets, using credentials
supplied via :meth:`.Bucket.add_bucket_creds`
"""
return self._cross_bucket
@cross_bucket.setter
def cross_bucket(self, value):
self._cross_bucket = value
@property
def timeout(self):
"""
Optional per-query timeout. If set, this will limit the amount
of time in which the query can be executed and waited for.
.. note::
The effective timeout for the query will be either this property
or the value of :attr:`couchbase.bucket.Bucket.n1ql_timeout`
property, whichever is *lower*.
.. seealso:: couchbase.bucket.Bucket.n1ql_timeout
"""
value = self._body.get('timeout', '0s')
value = value[:-1]
return float(value)
@timeout.setter
def timeout(self, value):
if not value:
self._body.pop('timeout', 0)
else:
value = float(value)
self._body['timeout'] = '{0}s'.format(value)
@property
def encoded(self):
"""
Get an encoded representation of the query.
This is used internally by the client, and can be useful
to debug queries.
"""
return json.dumps(self._body)
def __repr__(self):
return ('<{cls} stmt={stmt} at {oid}>'.format(
cls=self.__class__.__name__,
stmt=repr(self._body),
oid=id(self)))
[docs]class N1QLRequest(object):
[docs] def __init__(self, params, parent, row_factory=lambda x: x):
"""
Object representing the execution of the request on the
server.
.. warning::
You should typically not call this constructor by
yourself, rather use the :meth:`~.Bucket.n1ql_query`
method (or one of its async derivatives).
:param params: An :class:`N1QLQuery` object.
:param parent: The parent :class:`~.couchbase.bucket.Bucket` object
:param row_factory: Callable which accepts the raw dictionary
of each row, and can wrap them in a customized class.
The default is simply to return the dictionary itself.
To actually receive results of the query, iterate over this
object.
"""
if isinstance(params, basestring):
params = N1QLQuery(params)
self._params = params
self._parent = parent
self.row_factory = row_factory
self.errors = []
self._mres = None
self._do_iter = True
self.__raw = False
self.__meta_received = False
def _start(self):
if self._mres:
return
self._mres = self._parent._n1ql_query(
self._params.encoded, not self._params.adhoc,
cross_bucket=self._params.cross_bucket)
self.__raw = self._mres[None]
@property
def raw(self):
return self.__raw
@property
def meta(self):
"""
Get metadata from the query itself. This is guaranteed to only
return a Python dictionary.
Note that if the query failed, the metadata might not be in JSON
format, in which case there may be additional, non-JSON data
which can be retrieved using the following
::
raw_meta = req.raw.value
:return: A dictionary containing the query metadata
"""
if not self.__meta_received:
raise RuntimeError(
'This property only valid once all rows are received!')
if isinstance(self.raw.value, dict):
return self.raw.value
return {}
def _clear(self):
del self._parent
del self._mres
def _handle_meta(self, value):
self.__meta_received = True
if not isinstance(value, dict):
return
if 'errors' in value:
for err in value['errors']:
raise N1QLError.pyexc('N1QL Execution failed', err)
def _process_payload(self, rows):
if rows:
return [self.row_factory(row) for row in rows]
elif self.raw.done:
self._handle_meta(self.raw.value)
self._do_iter = False
return []
else:
# We can only get here if another concurrent query broke out the
# event loop before we did.
return []
[docs] def execute(self):
"""
Execute the statement and raise an exception on failure.
This method is useful for statements which modify data or
indexes, where the application does not need to extract any
data, but merely determine success or failure.
"""
for _ in self:
pass
return self
[docs] def get_single_result(self):
"""
Execute the statement and return its single result.
This should only be used on statements which are intended to
return only a single result.
:return: The single result, as encapsulated by the
`row_factory`
"""
for r in self:
return r
[docs] def __iter__(self):
if not self._do_iter:
raise AlreadyQueriedError()
self._start()
while self._do_iter:
raw_rows = self.raw.fetch(self._mres)
for row in self._process_payload(raw_rows):
yield row