#
# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This module is largely used by other modules, though it just contains
# simple string utilities :)
import json
from copy import deepcopy
from couchbase._pyport import long, xrange, ulp, basestring, parse_qs
from couchbase.exceptions import ArgumentError
# Some constants
STALE_UPDATE_BEFORE = "false"
STALE_UPDATE_AFTER = "update_after"
STALE_OK = "ok"
ONERROR_CONTINUE = "continue"
ONERROR_STOP = "stop"
class _Unspec(object):
def __nonzero__(self):
return False
# Py3
__bool__ = __nonzero__
def __str__(self):
return ""
def __repr__(self):
return "<Placeholder>"
UNSPEC = _Unspec()
def _bool_param_handler(input):
if isinstance(input, bool):
if input:
return "true"
else:
return "false"
if isinstance(input, basestring):
if input not in ("true", "false"):
raise ArgumentError.pyexc("String for boolean must be "
"'true' or 'false'", input)
return input
try:
input + 0
if input:
return "true"
else:
return "false"
except TypeError:
raise ArgumentError.pyexc("Boolean value must be boolean, "
"numeric, or a string of 'true' "
"or 'false'", input)
def _num_param_handler(input):
# Don't allow booleans:
if isinstance(input, bool):
raise ArgumentError.pyexc("Cannot use booleans as numeric values",
input)
try:
return str(int(input))
except Exception as e:
raise ArgumentError.pyexc("Expected a numeric argument", input, e)
def _string_param_common(input, do_quote=False):
# TODO, if we pass this to urlencode, do we ever need to quote?
# For the moment, i'm always forcing non-quote behavior
do_quote = False
s = None
if isinstance(input, basestring):
s = input
elif isinstance(input, bool):
raise ArgumentError.pyexc("Can't use boolean as string", input)
elif isinstance(input, (int, long, float)):
# Basic numeric types:
s = str(input)
else:
raise ArgumentError.pyexc("Expected simple numeric type or string ",
input)
if do_quote:
s = ulp.quote(s)
return s
def _string_param_handler(input):
return _string_param_common(input, do_quote=True)
def _generic_param_handler(input):
return _string_param_handler(input, do_quote=False)
def _stale_param_handler(input):
if input in (STALE_UPDATE_AFTER, STALE_OK, STALE_UPDATE_BEFORE):
return input
ret = _bool_param_handler(input)
if ret == "true":
ret = STALE_OK
return ret
def _onerror_param_handler(input):
if input not in (ONERROR_CONTINUE, ONERROR_STOP):
raise ArgumentError.pyexc(
"on_error must be 'continue' or 'stop'", input)
return input
def _jval_param_handler(input):
try:
ret = json.dumps(input)
return _string_param_handler(ret)
except Exception as e:
raise ArgumentError.pyexc("Couldn't convert value to JSON", input, e)
def _jarry_param_handler(input):
ret = _jval_param_handler(input)
if not ret.startswith('['):
raise ArgumentError.pyexc(
"Value must be converted to JSON array", input)
return ret
# Some more constants. Yippie!
class Params(object):
# Random, unspecified value.
DESCENDING = "descending"
STARTKEY = "startkey"
STARTKEY_DOCID = "startkey_docid"
ENDKEY = "endkey"
ENDKEY_DOCID = "endkey_docid"
KEY = "key"
KEYS = "keys"
INCLUSIVE_END = "inclusive_end"
GROUP = "group"
GROUP_LEVEL = "group_level"
REDUCE = "reduce"
SKIP = "skip"
LIMIT = "limit"
ON_ERROR = "on_error"
STALE = "stale"
DEBUG = "debug"
CONNECTION_TIMEOUT = "connection_timeout"
FULL_SET = "full_set"
MAPKEY_SINGLE = "mapkey_single"
MAPKEY_MULTI = "mapkey_multi"
MAPKEY_RANGE = "mapkey_range"
DOCKEY_RANGE = "dockey_range"
_HANDLER_MAP = {
Params.DESCENDING : _bool_param_handler,
Params.STARTKEY : _jval_param_handler,
Params.STARTKEY_DOCID : _string_param_handler,
Params.ENDKEY : _jval_param_handler,
Params.ENDKEY_DOCID : _string_param_handler,
Params.FULL_SET : _bool_param_handler,
Params.GROUP : _bool_param_handler,
Params.GROUP_LEVEL : _num_param_handler,
Params.INCLUSIVE_END : _bool_param_handler,
Params.KEY : _jval_param_handler,
Params.KEYS : _jarry_param_handler,
Params.ON_ERROR : _onerror_param_handler,
Params.REDUCE : _bool_param_handler,
Params.STALE : _stale_param_handler,
Params.SKIP : _num_param_handler,
Params.LIMIT : _num_param_handler,
Params.DEBUG : _bool_param_handler,
Params.CONNECTION_TIMEOUT: _num_param_handler
}
def _gendoc(param):
for k, v in Params.__dict__.items():
if param == v:
return "\n:data:`Params.{0}`".format(k)
[docs]class Query(object):
def _set_common(self, param, value, set_user=True):
# Invalidate encoded string
self._encoded = None
if value is UNSPEC:
self._real_options.pop(param, None)
if set_user:
self._user_options.pop(param, None)
return
handler = _HANDLER_MAP.get(param)
if not handler:
if not self.unrecognized_ok:
raise ArgumentError.pyexc(
"Unrecognized parameter. To use unrecognized parameters, "
"set 'unrecognized_ok' to True")
if not handler:
self._extra_options[param] = _string_param_handler(value)
return
if self.passthrough:
handler = _string_param_handler
self._real_options[param] = handler(value)
if set_user:
self._user_options[param] = value
def _get_common(self, param):
if param in self._user_options:
return self._user_options[param]
return self._real_options.get(param, UNSPEC)
def _set_range_common(self, k_sugar, k_start, k_end, value):
"""
Checks to see if the client-side convenience key is present, and if so
converts the sugar convenience key into its real server-side
equivalents.
:param string k_sugar: The client-side convenience key
:param string k_start: The server-side key specifying the beginning of
the range
:param string k_end: The server-side key specifying the end of the
range
"""
if not isinstance(value, (list, tuple, _Unspec)):
raise ArgumentError.pyexc(
"Range specification for {0} must be a list, tuple or UNSPEC"
.format(k_sugar))
if self._user_options.get(k_start, UNSPEC) is not UNSPEC or (
self._user_options.get(k_end, UNSPEC) is not UNSPEC):
raise ArgumentError.pyexc(
"Cannot specify {0} with either {1} or {2}"
.format(k_sugar, k_start, k_end))
if not value:
self._set_common(k_start, UNSPEC, set_user=False)
self._set_common(k_end, UNSPEC, set_user=False)
self._user_options[k_sugar] = UNSPEC
return
if len(value) not in (1, 2):
raise ArgumentError.pyexc("Range specification "
"must have one or two elements",
value)
value = value[::]
if len(value) == 1:
value.append(UNSPEC)
for p, ix in ((k_start, 0), (k_end, 1)):
self._set_common(p, value[ix], set_user=False)
self._user_options[k_sugar] = value
def __rangeprop(k_sugar, k_start, k_end):
def getter(self):
return self._user_options.get(k_sugar, UNSPEC)
def setter(self, value):
self._set_range_common(k_sugar, k_start, k_end, value)
return property(getter, setter, fdel=None, doc=_gendoc(k_sugar))
def __genprop(p):
def getter(self):
return self._get_common(p)
def setter(self, value):
self._set_common(p, value)
return property(getter, setter, fdel=None, doc=_gendoc(p))
descending = __genprop(Params.DESCENDING)
# Use the range parameters. They're easier
startkey = __genprop(Params.STARTKEY)
endkey = __genprop(Params.ENDKEY)
startkey_docid = __genprop(Params.STARTKEY_DOCID)
endkey_docid = __genprop(Params.ENDKEY_DOCID)
keys = __genprop(Params.KEYS)
key = __genprop(Params.KEY)
inclusive_end = __genprop(Params.INCLUSIVE_END)
skip = __genprop(Params.SKIP)
limit = __genprop(Params.LIMIT)
on_error = __genprop(Params.ON_ERROR)
stale = __genprop(Params.STALE)
debug = __genprop(Params.DEBUG)
connection_timeout = __genprop(Params.CONNECTION_TIMEOUT)
full_set = __genprop(Params.FULL_SET)
reduce = __genprop(Params.REDUCE)
group = __genprop(Params.GROUP)
group_level = __genprop(Params.GROUP_LEVEL)
# Aliases:
mapkey_single = __genprop(Params.KEY)
mapkey_multi = __genprop(Params.KEYS)
mapkey_range = __rangeprop(Params.MAPKEY_RANGE,
Params.STARTKEY, Params.ENDKEY)
dockey_range = __rangeprop(Params.DOCKEY_RANGE,
Params.STARTKEY_DOCID,
Params.ENDKEY_DOCID)
STRING_RANGE_END = json.loads('"\u0FFF"')
"""
Highest acceptable unicode value
"""
[docs] def __init__(self, passthrough=False, unrecognized_ok=False, **params):
"""
Create a new Query object.
A Query object is used as a container for the various view options.
It can be used as a standalone object to encode queries but is typically
passed as the ``query`` value to :class:`~couchbase.views.iterator.View`.
:param boolean passthrough:
Whether *passthrough* mode is enabled
:param boolean unrecognized_ok:
Whether unrecognized options are acceptable. See
:ref:`passthrough_values`.
:param params:
Key-value pairs for view options. See :ref:`view_options` for
a list of acceptable options and their values.
:raise: :exc:`couchbase.exceptions.ArgumentError` if a view option
or a combination of view options were deemed invalid.
"""
self.passthrough = passthrough
self.unrecognized_ok = unrecognized_ok
self._real_options = {}
self._user_options = {}
self._extra_options = {}
self._encoded = None
# String literal to pass along with the query
self._base_str = ""
self.update(**params)
[docs] def update(self, copy=False, **params):
"""
Chained assignment operator.
This may be used to quickly assign extra parameters to the
:class:`Query` object.
Example::
q = Query(reduce=True, full_sec=True)
# Someplace later
v = View(design, view, query=q.update(mapkey_range=["foo"]))
Its primary use is to easily modify the query object (in-place).
:param boolean copy:
If set to true, the original object is copied before new attributes
are added to it
:param params: Extra arguments. These must be valid query options.
:return: A :class:`Query` object. If ``copy`` was set to true, this
will be a new instance, otherwise it is the same instance on which
this method was called
"""
if copy:
self = deepcopy(self)
for k, v in params.items():
if not hasattr(self, k):
if not self.unrecognized_ok:
raise ArgumentError.pyexc("Unknown option", k)
self._set_common(k, v)
else:
setattr(self, k, v)
return self
@classmethod
def from_any(cls, params):
"""
Creates a new Query object from input.
:param params: Parameter to convert to query
:type params: dict, string, or :class:`Query`
If ``params`` is a :class:`Query` object already, a deep copy is made
and a new :class:`Query` object is returned.
If ``params`` is a string, then a :class:`Query` object is contructed
from it. The string itself is not parsed, but rather prepended to
any additional parameters (defined via the object's methods)
with an additional ``&`` characted.
If ``params`` is a dictionary, it is passed to the :class:`Query`
constructor.
:return: a new :class:`Query` object
:raise: :exc:`ArgumentError` if the input is none of the acceptable
types mentioned above. Also raises any exceptions possibly thrown
by the constructor.
"""
if isinstance(params, cls):
return deepcopy(params)
elif isinstance(params, dict):
return cls(**params)
elif isinstance(params, basestring):
ret = cls()
ret._base_str = params
return ret
else:
raise ArgumentError.pyexc("Params must be Query, dict, or string")
def _encode(self, omit_keys=False):
res_d = []
for k, v in self._real_options.items():
if v is UNSPEC:
continue
if omit_keys and k == "keys":
continue
if not self.passthrough:
k = ulp.quote(k)
v = ulp.quote(v)
res_d.append("{0}={1}".format(k, v))
for k, v in self._extra_options.items():
res_d.append("{0}={1}".format(k, v))
return '&'.join(res_d)
@property
[docs] def encoded(self):
"""
Returns an encoded form of the query
"""
if not self._encoded:
self._encoded = self._encode()
if self._base_str:
return '&'.join((self._base_str, self._encoded))
else:
return self._encoded
@property
def _long_query_encoded(self):
"""
Returns the (uri_part, post_data_part) for a long query.
"""
uristr = self._encode(omit_keys=True)
kstr = "{}"
klist = self._real_options.get('keys', UNSPEC)
if klist != UNSPEC:
kstr = '{{"keys":{0}}}'.format(klist)
return (uristr, kstr)
@property
def has_blob(self):
"""
Whether this query object is 'dirty'.
A 'dirty' object is one which
contains parameters unrecognized by the internal handling methods.
A dirty query may be constructed by using the ``passthrough``
or ``unrecognized_ok`` options, or by passing a string to
:meth:`from_any`
"""
return self._base_str or self.unrecognized_ok or self.passthrough
def __repr__(self):
return "Query:'{0}'".format(self.encoded)
def make_options_string(input, unrecognized_ok=False, passthrough=False):
if not isinstance(input, Query):
input = Query(passthrough=passthrough,
unrecognized_ok=unrecognized_ok,
**input)
return input.encoded
def make_dvpath(doc, view):
return "_design/{0}/_view/{1}?".format(doc, view)