Source code for couchbase.views.iterator

#
# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from collections import namedtuple
from copy import deepcopy
import json
from warnings import warn

from couchbase.exceptions import ArgumentError, CouchbaseError, ViewEngineError
from couchbase.views.params import Query, UNSPEC, make_dvpath
from couchbase._pyport import ulp, xrange
from couchbase.user_constants import FMT_JSON
import couchbase._libcouchbase as C

MAX_URI_LENGTH = 2048 # Let's be safe

class AlreadyQueriedError(CouchbaseError):
    """Thrown when iterating over a View which was already iterated over"""


ViewRow = namedtuple('ViewRow', ['key', 'value', 'docid', 'doc'])
"""
Default class for a single row.
"""


[docs]class RowProcessor(object): """ This class contains the handling and conversion functions between multiple rows and the means by which they are returned from the view iterator. This class should be overidden if you are: * Using a custom row class This saves on processing time and memory by converting from the raw results rather than having to unpack from the default class. This class returns a :class:`ViewRow` object by default. (This can also be overridden using the :attr:`rowclass` attribute) * Fetching multiple documents for each row You can use the :meth:`~couchbase.connection.Connection.get_multi` method to efficiently fetch multiple docs beforehand for the entire page. .. attribute:: rowclass Class or function to call for each result (row) received. This is called as ``rowclass(key, value, docid, doc)`` * ``key`` is the key as returned by the first argument to the view function's ``emit``. * ``value`` is the value returned as the second argument to the view function's ``emit``, or the value of the ``reduce`` function * ``docid`` is the ID of the document itself (as stored by one of the :meth:`~couchbase.connection.Connection.set` family of methods). If ``reduce`` was set to true for the view, this will always be None. * ``doc`` is the document itself - Only valid if ``include_docs`` is set to true - in which case a :class:`~couchbase.connection.Result` object is passed. If ``reduce`` was set to true for the view, this will always be None Otherwise, ``None`` is passed instead. By default, the :class:`ViewRow` is used. """ def __init__(self, rowclass=ViewRow): self._riter = None self._docs = None self.rowclass = rowclass
[docs] def handle_rows(self, rows, connection, include_docs): """ Preprocesses a page of rows. :param list rows: A list of rows. Each row is a JSON object containing the decoded JSON of the view as returned from the server :param connection: The connection object (pass to the :class:`View` constructor) :param include_docs: Whether to include documents in the return value. This is ``True`` or ``False`` depending on what was passed to the :class:`View` constructor :return: an iterable. When the iterable is exhausted, this method will be called again with a new 'page'. """ self._riter = iter(rows) if not include_docs: return iter(self) keys = tuple(x['id'] for x in rows) self._docs = connection.get_multi(keys, quiet=True) return iter(self)
def __iter__(self): if not self._riter: return for ret in self._riter: doc = None if self._docs is not None: # We still want to go through this if we have an empty dict try: doc = self._docs[ret['id']] except KeyError: warn("Error encountered when executing view. " "Inspect 'errors' for more information") yield self.rowclass(ret['key'], ret['value'], # Use get, because reduce values don't have # IDs ret.get('id'), doc) self._docs = None self._riter = None
[docs]class View(object):
[docs] def __init__(self, parent, design, view, row_processor=None, streaming=0, include_docs=False, query=None, **params): """ Construct a iterable which can be used to iterate over view query results. :param parent: The parent Connection object :type parent: :class:`~couchbase.connection.Connection` :param string design: The design document :param string view: The name of the view within the design document :param callable row_processor: See :attr:`row_processor` for more details. :param boolean include_docs: If set, the document itself will be retrieved for each row in the result. The default algorithm uses :meth:`~couchbase.connection.Connection.get_multi` for each page (i.e. every :attr:`streaming` results). The :attr:`~couchbase.views.params.Query.reduce` family of attributes must not be active, as results fro ``reduce`` views do not have corresponding doc IDs (as these are aggregation functions). :param bool streaming: Whether a streaming chunked request should be used. This is helpful for handling the view results in small chunks rather than loading the entire resultset into memory at once. By default, a single request is made and the response is decoded at once. With streaming enabled, rows are decoded incrementally. :param query: If set, is a :class:`~couchbase.views.params.Query` object. It is illegal to use this in conjunction with additional ``params`` :param params: Extra view options. This may be used to pass view arguments (as defined in :class:`~couchbase.views.params.Query`) without explicitly constructing a :class:`~couchbase.views.params.Query` object. It is illegal to use this together with the ``query`` argument. If you wish to 'inline' additional arguments to the provided ``query`` object, use the query's :meth:`~couchbase.views.params.Query.update` method instead. This object is an iterator - it does not send out the request until the first item from the iterator is request. See :meth:`__iter__` for more details on what this object returns. Simple view query, with no extra options:: # c is the Connection object. for result in View(c, "beer", "brewery_beers"): print("emitted key: {0}, doc_id: {1}" .format(result.key, result.docid)) Execute a view with extra query options:: # Implicitly creates a Query object view = View(c, "beer", "by_location", limit=4, reduce=True, group_level=2) Pass a Query object:: q = Query( stale=False, inclusive_end=True, mapkey_range=[ ["21st_ammendment_brewery_cafe"], ["21st_ammendment_brewery_cafe", Query.STRING_RANGE_END] ] ) view = View(c, "beer", "brewery_beer", query=q) Add extra parameters to query object for single call:: view = View(c, "beer", "brewery_beer", query=q.update(debug=True, copy=True)) Include documents with query:: view = View(c, "beer", "brewery_beer", query=q, include_docs=True) for result in view: print("Emitted key: {0}, Document: {1}".format( result.key, result.doc.value)) """ self._parent = parent self.design = design self.view = view self.errors = [] self.raw = None self.rows_returned = 0 self.include_docs = include_docs self.indexed_rows = 0 if not row_processor: row_processor = RowProcessor() self.row_processor = row_processor self._rp_iter = None if query and params: raise ArgumentError.pyexc( "Extra parameters are mutually exclusive with the " "'query' argument. Use query.update() to add extra arguments") if query: self._query = deepcopy(query) else: self._query = Query.from_any(params) if include_docs: if (self._query.reduce or self._query.group or self._query.group_level): raise ArgumentError.pyexc("include_docs is only applicable " "for map-only views, but 'reduce', " "'group', or 'group_level' " "was specified", self._query) # The original 'limit' parameter, passed to the query. self._streaming = streaming self._do_iter = True
@property def streaming(self): """ Read-Only. Returns whether streaming is enabled for this view. """ return self._streaming @property def query(self): """ Returns the :class:`~couchbase.views.params.Query` object associated with this execution instance. Note that is normally a modified version of the passed object (in the constructor's ``query`` params). It should not be directly modified. """ return self._query def _handle_errors(self, errors): if not errors: return self.errors += [ errors ] if self._query.on_error != 'continue': raise ViewEngineError.pyexc("Error while executing view.", self.errors) else: warn("Error encountered when executing view. Inspect 'errors' " "for more information") def _handle_meta(self, value): if not isinstance(value, dict): return self.indexed_rows = value.get('total_rows', 0) self._handle_errors(value.get('errors')) def _process_page(self, rows): if not rows: return self.rows_returned += len(rows) self._rp_iter = self.row_processor.handle_rows(rows, self._parent, self.include_docs) # Raise exceptions early on self._rp_iter = iter(self._rp_iter) def _handle_single_view(self): self.raw = self._create_raw() self._process_page(self.raw.value['rows']) self._handle_meta(self.raw.value) def _create_raw(self, **kwargs): """ Return common parameters for _libcouchbase._http_request """ d = { 'type': C.LCB_HTTP_TYPE_VIEW, 'fetch_headers': True, 'quiet': False, 'response_format': FMT_JSON } # Figure out the path qstr = self._query.encoded uri = make_dvpath(self.design, self.view) if len(uri) + len(qstr) > MAX_URI_LENGTH: (uriparams, post_data) = self._query._long_query_encoded d['method'] = C.LCB_HTTP_METHOD_POST d['post_data'] = post_data d['path'] = uri + uriparams d['content_type'] = "application/json" else: d['method'] = C.LCB_HTTP_METHOD_GET d['path'] = "{0}{1}".format(uri, qstr) d.update(**kwargs) return self._parent._http_request(**d) def _setup_streaming_request(self): """ Sets up the streaming request. This contains a streaming :class:`couchbase.results.HttpResult` object """ self.raw = self._create_raw(chunked=True) def _process_payload(self, rows): if rows: rows = tuple(json.loads(r) for r in rows) self._process_page(rows) if self.raw.done: self._handle_meta(self.raw.value) self._do_iter = False # No rows and nothing to iterate over? elif not self._rp_iter: self._rp_iter = iter([]) def _get_page(self): if not self._streaming: self._handle_single_view() self._do_iter = False return if not self.raw: self._setup_streaming_request() # Fetch the rows: rows = self.raw._fetch() self._process_payload(rows)
[docs] def __iter__(self): """ Returns a row for each query. The type of the row depends on the :attr:`row_processor` being used. :raise: :exc:`~couchbase.exceptions.ViewEngineError` If an error was encountered while processing the view, and the :attr:`~couchbase.views.params.Query.on_error` attribute was not set to `continue`. If `continue` was specified, a warning message is printed to the screen (via ``warnings.warn`` and operation continues). To inspect the error, examine :attr:`errors` :raise: :exc:`AlreadyQueriedError` If this object was already iterated over and the last result was already returned. """ if not self._do_iter: raise AlreadyQueriedError.pyexc( "This object has already been executed. Create a new one to " "query again") while self._do_iter: self._get_page() if not self._rp_iter: break for r in self._rp_iter: yield r self._rp_iter = None
def __repr__(self): details = [] details.append("Design={0}".format(self.design)) details.append("View={0}".format(self.view)) details.append("Query={0}".format(self._query)) details.append("Rows Fetched={0}".format(self.rows_returned)) return '{cls}<{details}>'.format(cls=self.__class__.__name__, details=', '.join(details))