Python Analytics SDK Quickstart Guide

    Install, connect, try. A quick start guide to get you up and running with Enterprise Analytics and the Python Analytics SDK.

    Enterprise Analytics is a real-time analytical database (RT-OLAP) for real time apps and operational intelligence. Although maintaining some syntactic similarities with the operational SDKs, the Python Analytics SDK is developed from the ground-up for column-based analytical use cases, and supports streaming APIs to handle large datasets.

    Before You Start

    Install and configure an Enterprise Analytics Cluster.

    Prerequisites

    Currently Python 3.9 - Python 3.12 is supported. See the compatibility page for more information about platform support.

    Getting the SDK

    The SDK can be installed via pip:

    python -m pip install couchbase-analytics

    For other installation methods, see the installation page.

    Connecting and Executing a Query

    Synchronous API

    from couchbase_analytics.cluster import Cluster
    from couchbase_analytics.credential import Credential
    from couchbase_analytics.options import QueryOptions
    
    
    def main() -> None:
        # Update this to your cluster
        endpoint = 'https://--your-instance--'
        username = 'username'
        pw = 'Password!123'
        # User Input ends here.
    
        cred = Credential.from_username_and_password(username, pw)
        cluster = Cluster.create_instance(endpoint, cred)
    
        # Execute a query and buffer all result rows in client memory.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
        res = cluster.execute_query(statement)
        all_rows = res.get_all_rows()
        for row in all_rows:
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
        # Execute a query and process rows as they arrive from server.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
        res = cluster.execute_query(statement)
        for row in res.rows():
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
        # Execute a streaming query with positional arguments.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
        res = cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
        for row in res:
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
        # Execute a streaming query with named arguments.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
        res = cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
                                                                              'limit': 10}))
        for row in res.rows():
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
    
    if __name__ == '__main__':
        main()

    Asynchronous (asyncio) API

    import asyncio
    
    from acouchbase_analytics.cluster import AsyncCluster
    from acouchbase_analytics.credential import Credential
    from acouchbase_analytics.options import QueryOptions
    
    
    async def main() -> None:
        # Update this to your cluster
        endpoint = 'https://--your-instance--'
        username = 'username'
        pw = 'Password!123'
        # User Input ends here.
    
        cred = Credential.from_username_and_password(username, pw)
        cluster = AsyncCluster.create_instance(endpoint, cred)
    
        # Execute a query and buffer all result rows in client memory.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
        res = await cluster.execute_query(statement)
        all_rows = await res.get_all_rows()
        # NOTE: all_rows is a list, _do not_ use `async for`
        for row in all_rows:
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
        # Execute a query and process rows as they arrive from server.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
        res = await cluster.execute_query(statement)
        async for row in res.rows():
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
        # Execute a streaming query with positional arguments.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
        res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
        async for row in res:
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
        # Execute a streaming query with named arguments.
        statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
        res = await cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
                                                                                    'limit': 10}))
        async for row in res.rows():
            print(f'Found row: {row}')
        print(f'metadata={res.metadata()}')
    
    if __name__ == '__main__':
        asyncio.run(main())

    Connection String

    The connStr in the above example should takes the form of "https://<your_hostname>:" + PORT. The default port is 443, for TLS connections. You do not need to give a port number if you are using port 443 — hostname = "https://<your_hostname>" is effectively the same as `hostname = "https://<your_hostname>:" + "443"

    If you are using a different port — for example, connecting to a cluster without a load balancer, directly to the Analytics port, 18095 — or not using TLS, then see the Connecting to Enterprise Analytics page.

    Migration from Row-Based Analytics

    If you are migrating a project from CBAS — our Analytics service on Capella Operational and Couchbase Server, using our operational SDKs — then information on migration can be found in the Enterprise Analytics docs.

    In particular, refer to the SDK section of the Enterprise Analytics migration pages.