Async APIs

  • how-to
    +
    The Couchbase Python SDK offers both asyncio and Twisted APIs for async operation.

    Asyncio

    To use the Python SDK with asyncio, use the acouchbase module. As opposed to the synchronous SDK methods which wait for completion and return Result objects, the acouchbase methods return a Future. You may await the success or failure of the Future. If a Future is awaited, the method awaiting the task must have the async keyword in its signature. Result objects are obtained after await ing the Future as seen in the examples below.

    Create an acouchbase Cluster
    
    # needed for cluster creation
    from acouchbase.cluster import Cluster
    from couchbase.options import ClusterOptions
    from couchbase.auth import PasswordAuthenticator
    
    
    async def get_couchbase():
        cluster = Cluster(
            "couchbase://your-ip",
            ClusterOptions(PasswordAuthenticator("Administrator", "password")))
        bucket = cluster.bucket("travel-sample")
        await bucket.on_connect()
    
        return cluster, bucket

    Asyncio Document (KV) Operations

    Key-value and sub-document operations return Future objects which can then be used for await clauses. The Future’s result will always be the relevant Result object for the operation performed.

    KV - Operations
    async def kv_operations(collection):
        key = "hotel_10025"
        res = await collection.get(key)
        hotel = res.content_as[dict]
        print("Hotel: {}".format(hotel))
    
        hotel["alias"] = "Hostel"
        res = await collection.upsert(key, hotel)
        print("CAS: {}".format(res.cas))
    
        # handle exception
        try:
            res = await collection.get("not-a-key")
        except DocumentNotFoundException as ex:
            print("Document not found exception caught!")
    Sub-document Operations
    async def sub_doc_operations(collection):
        key = "hotel_10025"
        res = await collection.lookup_in(key,
                                         [SD.get("reviews[0].ratings")])
    
        print("Review ratings: {}".format(res.content_as[dict](0)))
    
        res = await collection.mutate_in(key,
                                         [SD.replace("reviews[0].ratings.Rooms", 3.5)])
        print("CAS: {}".format(res.cas))

    Asyncio SQL++ Operations

    The API for issuing SQL++ (formerly N1QL) queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results.

    SQL++ Query
    async def n1ql_query(cluster):
        try:
            result = cluster.query(
                "SELECT h.* FROM `travel-sample`.inventory.hotel h WHERE h.country=$country LIMIT 10",
                QueryOptions(named_parameters={"country": "United Kingdom"}))
    
            async for row in result:
                print("Found row: {}".format(row))
        except ParsingFailedException as ex:
            print(ex)
        except CouchbaseException as ex:
            print(ex)

    Asyncio Search Operations

    The API for issuing full text search queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results.

    Search Query
    async def search_query(cluster):
        try:
            result = cluster.search_query(
                "travel-sample-index", search.QueryStringQuery("swanky"))
    
            async for row in result:
                print(f"Found row: {row}")
            print(f"Reported total rows: {result.metadata().metrics().total_rows()}")
    
        except CouchbaseException as ex:
            print(ex)

    Asyncio Analytics Operations

    The API for issuing analytics queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results.

    Analytics Query
    async def analytics_query(cluster):
        try:
            result = cluster.analytics_query(
                "SELECT a.* FROM `travel-sample`.inventory.airports a WHERE a.country = $country LIMIT 10",
                AnalyticsOptions(named_parameters={"country": "France"}))
            async for row in result:
                print("Found row: {}".format(row))
        except CouchbaseException as ex:
            print(ex)

    Twisted

    To use the Python SDK with the "Twisted" framework, use the txcouchbase module. As opposed to the synchronous SDK methods which wait for completion and return Result objects, the txcouchbase methods return a Twisted Deferred. You may configure Deferred with callback and errback handlers. Result objects are propagated to the callback as seen in the examples below.

    The txcouchbase package must be imported prior to importing the Twisted reactor.
    txcouchbase imports
    # **IMPORTANT** need to do this import prior to importing the reactor (new to the Python 4.x SDK)
    import txcouchbase
    from twisted.internet import reactor
    
    # needed for FTS support
    import couchbase.search as search
    
    # needed for sub-document operations
    import couchbase.subdocument as SD
    
    # used for analytics operations
    from couchbase.analytics import AnalyticsOptions
    
    # used to support SQL++ (N1QL) query
    from couchbase.options import QueryOptions
    
    # needed for cluster creation
    from couchbase.auth import PasswordAuthenticator
    from txcouchbase.cluster import TxCluster
    
    # used for handling result objects
    from couchbase.result import GetResult, LookupInResult
    Create a txcouchbase Cluster
    def on_connect_ok(result, cluster):
        # create a bucket object
        bucket = cluster.bucket('travel-sample')
        # get a reference to the default collection, required for older Couchbase server versions
        cb_coll_default = bucket.default_collection()
        # get a reference a named collection
        cb_coll = bucket.scope("inventory").collection("hotel")
        do_stuff(cluster, bucket, cb_coll, cb_coll_default)
    
    
    def on_connect_err(error):
        print("Unable to connect.\n{}".format(error))
    
    
    # create a cluster object
    cluster = TxCluster('couchbase://your-ip',
                        authenticator=PasswordAuthenticator(
                            'Administrator',
                            'password'))
    
    # wait for connect
    cluster.on_connect().addCallback(on_connect_ok, cluster).addErrback(on_connect_err)

    Twisted Document (KV) Operations

    Key-value and sub-document operations return Deferred objects which can then be configured with callback and errback handlers. The Deferred’s result will always be the relevant Result object for the operation performed.

    KV - Operations
    def on_kv_ok(result):
        if isinstance(result, GetResult):
            print("Document: {}".format(result.content_as[dict]))
        else:
            print("CAS: {}".format(result.cas))
    
    
    def on_kv_error(error):
        print("Operation had an error.\nError: {}".format(error))
    
    
    def kv_operations(collection):
        key = "hotel_10025"
        collection.get(key).addCallback(on_kv_ok).addErrback(on_kv_error)
    
        new_hotel = {
            "title": "Couchbase",
            "name": "The Couchbase Hotel",
            "address": "Pennington Street",
            "directions": None,
            "phone": "+44 1457 855449",
        }
    
        collection.upsert("hotel_98765", new_hotel).addCallback(
            on_kv_ok).addErrback(on_kv_error)
    
        collection.get("not-a-key").addCallback(on_kv_ok).addErrback(on_kv_error)
    Sub-document Operations
    def on_sd_ok(result, idx=0):
        if isinstance(result, LookupInResult):
            print("Sub-document: {}".format(result.content_as[dict](idx)))
        else:
            print("CAS: {}".format(result.cas))
    
    
    def on_sd_error(error):
        print("Operation had an error.\nError: {}".format(error))
    
    
    def sub_doc_operations(collection):
        key = "hotel_10025"
    
        collection.lookup_in(key, [SD.get("reviews[0].ratings")]).addCallback(
            on_sd_ok).addErrback(on_sd_error)
    
        collection.mutate_in(key, [SD.replace("reviews[0].ratings.Rooms", 3.5)]).addCallback(
            on_sd_ok).addErrback(on_sd_error)

    Twisted SQL++ Operations

    The API for issuing SQL++ queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.

    SQL++ Query
    def handle_n1ql_results(result):
        for r in result.rows():
            print("Query row: {}".format(r))
    
    
    def n1ql_query(cluster):
        d = cluster.query(
            "SELECT h.* FROM `travel-sample`.inventory.hotel h WHERE h.country=$country LIMIT 2",
            QueryOptions(named_parameters={"country": "United Kingdom"}))
        d.addCallback(handle_n1ql_results).addErrback(on_streaming_error)

    Twisted Search Operations

    The API for issuing full text search queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.

    Search Query
    def handle_search_results(result):
        for r in result.rows():
            print("Search row: {}".format(r))
    
    
    def search_query(cluster):
        d = cluster.search_query(
            "travel-sample-index", search.QueryStringQuery("swanky"))
        d.addCallback(handle_search_results).addErrback(on_streaming_error)

    Twisted Analytics Operations

    The API for issuing analytics queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.

    Analytics Query
    def handle_analytics_results(result):
        for r in result.rows():
            print("Analytics row: {}".format(r))
    
    def analytics_query(cluster):
        d = cluster.analytics_query(
            "SELECT id, country FROM `travel-sample`.inventory.airports a WHERE a.country = $country LIMIT 10",
            AnalyticsOptions(named_parameters={"country": "France"}))
        d.addCallback(handle_analytics_results).addErrback(on_streaming_error)