Asynchronous Interface

The asynchronous interface to the SDK is a work in progress, and is currently intended to be used by integrators into higher level async wrappers. See the txcouchbase package for an example.

This document largely explains the current internals of how the Couchbase async module works in a lower level. For a higher level overview, see: http://blog.couchbase.com/python-sdk-and-twisted

Key-Value Interface

The Key-Value interface of the async subsystem functions as closely as possible like its synchronous counterpart. The primary difference is that where the synchronous interface would return an instance of a Result or a MultiResult, the asynchronous interface returns an AsyncResult object.

The AsyncResult object contains fields for two callbacks which are invoked when the result is ready. One is the callback field which is called with a Result or MultiResult upon success, and the other is the errback field which is invoked with an exception object upon error.

The semantics of when an exception is passed follows the rules of the quiet parameter just like the synchronous API.

class couchbase_core.result.AsyncResult
AsyncResult.callback

Callback to be invoked with this result

AsyncResult.errback

Callback to be invoked with any errors

class couchbase_core.asynchronous.bucket.AsyncBucket(iops=None, *args, **kwargs)[source]

Bases: couchbase_v2.bucket.Bucket

This class contains the low-level async implementation of the Bucket interface. This module is not intended to be used directly by applications.

Warning

Using this module directly may cause odd error messages or application crashes. Use an existing subclass designated for your I/O framework (txcouchbase, gcouchbase, acouchbase) or subclass this module (continue reading) if one does not already exist.

Additionally, this module is considered internal API, as such, the interface is subject to change.

An asynchronous bucket must be wired to a so-called IOPS implementation (see IOPS). The purpose of the IOPS class is to provide the basic I/O wiring between the module and the underlying event system.

In non-asynchronous use modes (e.g. the normal asynchronous Bucket), the wiring is done internally within the C library via an event loop that is “run” for each operation and is “stopped” whenever all operations complete.

In order to successfully implement an asynchronous bucket, rather than running and stopping the event loop for each operation, it is assumed the event loop is driving the entire application, and is implicitly run whenever control is returned to it.

In Python, two main styles of asynchronous programming exist:

  • Explicit callback-based asynchronous programming (such that is found in Twisted). This style explicitly makes applications aware of an event loop (or “reactor”) and requests that they register callbacks for various events.

  • Coroutine-based asynchronous programming, that involves implicitly _yielding_ to an event loop. In this style, the programming style seems to be synchronous, and the actual event library (for example, gevent, or tulip) will implicitly yield to the event loop when the current coroutine awaits I/O completion. These forms of event loops, are from the library’s perspective, identical to the classic callback-based event loops (but see below).

In both event models, the internal I/O notification system is callback-based. The main difference is in how the high-level Bucket functions (for example, get() operate:

In callback-based models, these return objects which allow a callback to be assigned to them, whereas in coroutine-based models, these will implicitly yield to other couroutines.

In both cases, the operations (from this class itself) will return an object which allows the callback to be set. Subclasses of this module should ensure that this return value is wrapped into a suitable object appropriate to whichever event framework is actually being used.

Several known subclasses exist:

  • acouchbase.bucket.Bucket - this is the Python3/Tulip based implementation, and uses a hybrid callback/implicit yield functionality (by returning “future” objects).

  • gcouchbase.bucket.Bucket - this is the gevent based implementation, and uses an implicit yield model; where the bucket class will yield to the event loop and return actual “result” objects

  • txcouchbase.bucket.RawBucket - this is a thin wrapper around this class, which returns AsyncResult objects: Since Twisted is callback-based, it is possible to return these raw objects and still remain somewhat idiomatic.

  • txcouchbase.bucket.Bucket - this wraps the RawBucket class (above) and returns Deferred objects.

Create a new Async Bucket. An async Bucket is an object which functions like a normal synchronous bucket connection, except that it returns future objects (i.e. AsyncResult objects) instead of Result. These objects are actually MultiResult objects which are empty upon retun. As operations complete, this object becomes populated with the relevant data.

Note that the AsyncResult object must currently have valid callback and errback fields initialized after they are returned from the API methods. If this is not the case then an exception will be raised when the callbacks are about to arrive. This behavior is the primary reason why this interface isn’t public, too :)

Parameters
  • iops – An IOPS-interface conforming object. This object must not be used between two instances, and is owned by the connection object.

  • kwargs – Additional arguments to pass to the Bucket constructor

endure(key, *args, **kwargs)[source]

Wait until a key has been distributed to one or more nodes

By default, when items are stored to Couchbase, the operation is considered successful if the vBucket master (i.e. the “primary” node) for the key has successfully stored the item in its memory.

In most situations, this is sufficient to assume that the item has successfully been stored. However the possibility remains that the “master” server will go offline as soon as it sends back the successful response and the data is lost.

The endure function allows you to provide stricter criteria for success. The criteria may be expressed in terms of number of nodes for which the item must exist in that node’s RAM and/or on that node’s disk. Ensuring that an item exists in more than one place is a safer way to guarantee against possible data loss.

We call these requirements Durability Constraints, and thus the method is called endure.

Parameters
  • key (string) – The key to endure.

  • persist_to (int) –

    The minimum number of nodes which must contain this item on their disk before this function returns. Ensure that you do not specify too many nodes; otherwise this function will fail. Use the server_nodes to determine how many nodes exist in the cluster.

    The maximum number of nodes an item can reside on is currently fixed to 4 (i.e. the “master” node, and up to three “replica” nodes). This limitation is current as of Couchbase Server version 2.1.0.

    If this parameter is set to a negative value, the maximum number of possible nodes the key can reside on will be used.

  • replicate_to (int) – The minimum number of replicas which must contain this item in their memory for this method to succeed. As with persist_to, you may specify a negative value in which case the requirement will be set to the maximum number possible.

  • timeout (float) – A timeout value in seconds before this function fails with an exception. Typically it should take no longer than several milliseconds on a functioning cluster for durability requirements to be satisfied (unless something has gone wrong).

  • interval (float) – The polling interval in seconds to use for checking the key status on the respective nodes. Internally, endure is implemented by polling each server individually to see if the key exists on that server’s disk and memory. Once the status request is sent to all servers, the client will check if their replies are satisfactory; if they are then this function succeeds, otherwise the client will wait a short amount of time and try again. This parameter sets this “wait time”.

  • check_removed (bool) – This flag inverts the check. Instead of checking that a given key exists on the nodes, this changes the behavior to check that the key is removed from the nodes.

  • cas (long) – The CAS value to check against. It is possible for an item to exist on a node but have a CAS value from a prior operation. Passing the CAS ensures that only replies from servers with a CAS matching this parameter are accepted.

Returns

A OperationResult

Raise

see upsert() and get() for possible errors

See also

upsert(), endure_multi()

query(*args, **kwargs)[source]

Reimplemented from base class.

This method does not add additional functionality of the base class’ query() method (all the functionality is encapsulated in the view class anyway). However it does require one additional keyword argument

Parameters

itercls (class) – A class used for instantiating the view object. This should be a subclass of AsyncViewBase.

Views Interface

Different from the key-value interface, the synchronous view API returns a View object which is itself an iterator which yields results. Because this is a synchronous API, the iterator interface must be replaced with a class interface which must be subclassed by a user.

class couchbase_core.asynchronous.view.AsyncViewBase[source]
__init__(*args, **kwargs)[source]

Initialize a new AsyncViewBase object. This is intended to be subclassed in order to implement the require methods to be invoked on error, data, and row events.

Usage of this class is not as a standalone, but rather as an itercls parameter to the query() method of the connection object.

on_rows(rowiter)

Called when there are more processed views.

Parameters

rowiter (iterable) – An iterable which will yield results as defined by the RowProcessor implementation

This method must be implemented in a subclass

on_error(ex)

Called when there is a failure with the response data

Parameters

ex (Exception) – The exception caught.

This must be implemented in a subclass

on_done()

Called when this request has completed. Once this method is called, no other methods will be invoked on this object.

This method must be implemented in a subclass

start()

I/O Interface

The async API is divided into several sections. In order to have an async client which interacts with other async libraries and frameworks, it is necessary to make the Couchbase extension aware of that environment. To this end, the IOPS interface is provided. The IOPS API is entirely separate from the key-value API and should be treated as belonging to a different library. It is simply the extension’s I/O abstraction.

class couchbase_core.iops.base.Event

This class represents an Event. This concept should be familiar to the intended audience, who should be familiar with event loops and their terminology. It represents a certain event in the future, which shall be triggered either by something happening or by the passage of time.

When said event takes place, the object should be signalled via the ready() method.

ready()

Called when an event is ready

class couchbase_core.iops.base.IOEvent

A subclass of Event, this represents a socket. Events applied to this socket are triggered when the socket becomes available for reading or writing.

ready_r()

Called for read events. This is the efficient form of ready(LCB_READ_EVENT)

ready_w()

Called for write events. This is equivalent to ready(LCB_WRITE_EVENT)

ready_rw()

Called for rw events. This is equivalent to ready(LCB_READ_EVENT|LCB_WRITE_EVENT)

fileno()
class couchbase_core.iops.base.TimerEvent

Subclass of Event which represents a passage of time.

class couchbase_core.iops.base.IOPS[source]
__init__()[source]

The IOPS class is intended as an efficient and multiplexing manager of one or more Event objects.

As this represents an interface with methods only, there is no required behavior in the constructor of this object

update_event(event, action, flags)[source]

This method shall perform an action modifying an event.

Parameters
  • event – An IOEvent object which shall have its watcher settings modified. The IOEvent object is an object which provides a fileno() method.

  • action (int) –

    one of:

    • PYCBC_EVACTION_WATCH: Watch this file for events

    • PYCBC_EVACTION_UNWATCH: Remove this file from all watches

    • PYCBC_EVACTION_CLEANUP: Destroy any references to this object

  • flags (int) –

    Event details, this indicates which events this file should be watched for. This is only applicable if action was PYCBC_EVACTION_WATCH. It can a bitmask of the following:

    • LCB_READ_EVENT: Watch this file until it becomes readable

    • LCB_WRITE_EVENT: Watch this file until it becomes writeable

If the action is to watch the event for readability or writeability, the IOPS implementation shall schedule the underlying event system to call one of the ready_r, ready_w or ready_rw methods (for readbility, writeability or both readability and writability respectively) at such a time when the underlying reactor/event loop implementation has signalled it being so.

Event watchers are non-repeatable. This means that once the event has been delivered, the IOEvent object shall be removed from a watching state. The extension shall call this method again for each time an event is requested.

This method must be implemented

update_timer(timer, action, usecs)[source]

This method shall schedule or unschedule a timer.

Parameters
  • timer – A TimerEvent object.

  • action – See update_event() for meaning

  • usecs – A relative offset in microseconds when this timer shall be fired.

This method follows the same semantics as update_event(), except that there is no file.

When the underlying event system shall invoke the timer, the TimerEvent ready method shall be called with 0 as its argument.

Like IOEvents, TimerEvents are non-repeatable.

This method must be implemented

start_watching()[source]

Called by the extension when all scheduled IO events have been submitted. Depending on the I/O model, this method can either drive the event loop until stop_watching() is called, or do nothing.

This method must be implemented

stop_watching()[source]

Called by the extension when it no longer needs to wait for events. Its function is to undo anything which was done in the start_watching() method

This method must be implemented

io_event_factory()[source]

Returns a new instance of IOEvent.

This method is optional, and is useful in case an implementation wishes to utilize its own subclass of IOEvent.

As with most Python subclasses, the user should ensure that the base implementation’s __init__ is called.

timer_event_factory()[source]

Returns a new instance of TimerEvent. Like the io_event_factory(), this is optional

Action Constants

couchbase_core.iops.base.PYCBC_EVACTION_WATCH

Action indicating the specific event should be added to the event loop’s “watcher” list, and should be have its ready() method called when the IO implementation has detected the specific event is ready

couchbase_core.iops.base.PYCBC_EVACTION_UNWATCH

Action indicating that the specific object should not be notified when the IO state changes. This is typically done by removing it from the watcher list

couchbase_core.iops.base.PYCBC_EVACTION_CLEANUP

Action to permanently erase any references to this event

IO Event Constants

couchbase_core.iops.base.LCB_READ_EVENT

IO Flag indicating that this event should be notified on file readbility

couchbase_core.iops.base.LCB_WRITE_EVENT

IO flag indicating that this event should be notified on file writeability

couchbase_core.iops.base.LCB_RW_EVENT

Equivalent to LCB_READ_EVENT|LCB_WRITE_EVENT