A newer version of this documentation is available.

View Latest

Asynchronous Progamming Using the Python SDK with Couchbase Server

    +

    The Python SDK supports asynchronous programming by offering integration modules for Twisted, gevent, and Python 3.4 asyncio.

    Users of any of these frameworks should understand and know how the synchronous API functions before using any asynchronous framework, since the asynchronous functionality is an extension of the synchronous functionality.

    Twisted

    Refer to the API reference (http://pythonhosted.org/couchbase/api/txcouchbase.html) for more information about the txcouchbase module.

    Twisted is a high performance and very mature Python asynchronous framework.

    Twisted Document (KV) Operations

    To use the Python SDK with twisted, use the txcouchbase module (included with the SDK). As opposed to the synchronous SDK methods which wait for completion and return Result objects, the txcouchbase.bucket.Bucket object returns a Twisted Deferred. You may configure Deferred with callback and errback handlers. Result objects are propagated to the callback as seen in the example below.

    Twisted - KV Operation
    from twisted.internet import reactor
    from txcouchbase.bucket import Bucket
    bucket = Bucket('couchbase://localhost/default')
    
    def on_ok(result):
        print "Operation succeeded"
        if hasattr(result, 'value'):
            print "Value is", result.value
    
    def on_err(err):
        print "Operation failed", err
    
    bucket.upsert('id', {'some':['value']}).addCallback(on_ok).addErrback(on_err)
    bucket.get('id').addCallback(on_ok).addErrback(on_err)
    
    # tell reactor to stop after 3 seconds
    reactor.callLater(3, reactor.stop)
    
    # start up the Twisted reactor (event loop handler) manually
    reactor.run()

    Twisted N1QL Queries

    You may issue N1QL queries in the Twisted API using the n1qlQueryAll() or n1qlQueryEx() methods. The n1qlQueryAll() method returns an Deferred which will invoke the callback with an iterable of rows. The n1qlQueryEx accepts a user-defined subclass of couchbase.async.n1ql.AsyncN1QLRequest, with several callbacks implemented, and allows you to handle the rows as they are received from the cluster, rather than buffering the entire resultset in memory before invoking the handler.

    Twisted - n1qlQueryAll
    @inlineCallbacks
    def do_n1ql_query(cb):
        rvs = yield cb.n1qlQueryAll(
                N1QLQuery('SELECT * from `travel-sample` LIMIT 10'))
        for row in rvs:
            print row
    Twisted - n1qlQueryEx
    class RowsHandler(AsyncN1QLRequest):
        def __init__(self, *args, **kwargs):
            super(RowsHandler, self).__init__(*args, **kwargs)
            self.deferred = Deferred()
    
        def on_rows(self, rows):
            print "Got {} rows".format(len([x for x in rows]))
        def on_done(self):
    
            print "Rows complete!"
            self.deferred.callback(self)
    
        def on_error(self, ex):
            self.deferred.errback(ex)
    
    @inlineCallbacks
    def do_n1ql_query(cb):
        yield cb.n1qlQueryEx(RowsHandler, N1QLQuery('SELECT * FROM `travel-sample` LIMIT 10'))

    Twisted MapReduce (View) Queries

    Like N1QL queries, MapReduce queries are available via the queryAll() and queryEx() methods, the former returning all the rows at once, and the latter accepting a subclass of couchbase.async.view.AsyncViewBase which defines various functions.

    Twisted - queryAll
    @inlineCallbacks
    def do_view_query():
        rows = yield cb.queryAll(
                'beer', 'brewery_beers', limit=20, include_docs=True)
        for row in rows:
            print row.key, row.doc.value['description'][:10] + '...'
    Twisted - queryEx
    from couchbase.async.view import AsyncViewBase
    
    class RowsHandler(AsyncViewBase):
        def __init__(self, *args, **kwargs):
            super(RowsHandler, self).__init__(*args, **kwargs)
            self.deferred = Deferred()
    
        def on_rows(self, rows):
            print '** Got row callback!'
            for row in rows:
                print row.key, row.doc.value['description'][:10] + '...'
    
        def on_error(self, ex):
            self.deferred.errback(ex)
    
        def on_done(self):
            self.deferred.callback(self)
    
    @inlineCallbacks
    def do_view_query(cb):
        handler = cb.queryEx(
                RowsHandler, 'beer', 'brewery_beers', limit=20, include_docs=True)
        yield handler.deferred

    Gevent

    gevent is a high performance asynchronous framework. It is very powerful in that it allows for a (mostly) traditional synchronous coding style. It accomplishes this by using coroutines known as greenlets or eventlets.

    The Gevent support in the Python SDK is truly native when you use the dedicated module, gcouchbase. This means that it will not block your application while performing I/O. It should also be noted that the gevent functionality does not depend on monkey-patching functionality. Because of how gevent support is implemented.

    The above is important because the normal synchronous couchbase module will also function in gevent, however unlike gcouchbase, it will block your application while running.

    The gevent API is almost identical to the simple synchronous API, though it requires that a different module be imported to properly integrate with gevent. So instead of

    from couchbase.bucket import Bucket
    cb = Bucket(connstr)

    do

    from gcouchbase.bucket import Bucket
    cb = Bucket(connstr)

    Gevent Document (KV) Operations:

    Key-Value operations using gcouchbase is exactly the same as the synchronous couchbase counterpart

    Gevent - KV Operation
    from gcouchbase.bucket import Bucket
    bucket = Bucket('couchbase://localhost/default')
    bucket.upsert('id', {'some':['value']})
    print bucket.get('id').value

    Gevent N1QL Queries

    The API for N1QL queries with gcouchbase is exactly the same as in the synchronous module:

    Gevent - N1QL Query
    from gcouchbase.bucket import Bucket
    from couchbase.n1ql import N1QLQuery
    
    cb = Bucket('couchbase://localhost/travel-sample')
    it = cb.n1ql_query(N1QLQuery('SELECT * from `travel-sample` LIMIT 10'))
    for row in it:
        print row

    Note that if you inspect the returned iterator object, it is actually a different object than the synchronous iterator:

    print it
    # <gcouchbase.bucket.GN1QLRequest object at 0x103694d50>

    Gevent MapReduce Queries

    Issuing MapReduce queries with gcouchbase is exactly the same as in the synchronous module

    Gevent - MapReduce
    from gcouchbase.bucket import Bucket
    
    cb = Bucket('couchbase://localhost/beer-sample')
    resiter = cb.query('beer', 'brewery_beers', limit=10)
    for row in resiter:
        print row

    Asyncio (Python 3.5+)

    asyncio is another asynchronous module which ships with Python version 3.5 and later. The asyncio module is supported in Couchbase via the experimental acouchbase module. The acouchbase.bucket.Bucket object returns asyncio.Future objects rather than actual results.

    To use a bucket from acouchbase, you need to enable experimental SDK features. This is essentially an explicit disclaimer which is visible in production code to ensure that potentially unstable code does not accidentally get used in production.

    Asyncio Document (KV) Operations

    CRUD operations return Future objects which can then be used for await clauses. The future’s result will always be the relevant Result object for the operation performed.

    Asyncio - KV Operation
    import asyncio
    import couchbase.experimental; couchbase.experimental.enable()
    from acouchbase.bucket import Bucket
    
    async def do_crud_op():
        cb = Bucket('couchbase://localhost/default')
        await cb.connect()
        await cb.upsert('id', {'some': 'value'})
        return await cb.get('id')
    
    loop = asyncio.get_event_loop()
    rv = loop.run_until_complete(do_crud_op())
    print(rv.value)

    Asyncio N1QL Queries

    The API for issuing N1QL queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results:

    Asyncio - N1QL Query
    async def do_n1ql_query(cb):
        it = cb.n1ql_query(N1QLQuery('SELECT * from `travel-sample` LIMIT 10'))
        async for row in it:
            print(row)

    Asyncio MapReduce Queries

    The API for issuing MapReduce queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results:

    Asyncio - MapReduce
    async def do_view_query(cb):
        it = cb.query('beer', 'brewery_beers', limit=20, include_docs=True)
        async for row in it:
            print(row.key, row.doc.value['description'][:10] + '...')