Hello Columnar — Python SDK Quickstart Guide

      +
      Install, connect, try. A quick start guide to get you up and running with Columnar and the Python Columnar SDK.

      Capella Columnar is a real-time analytical database (RT-OLAP) for real time apps and operational intelligence. Although maintaining some syntactic similarities with the operational SDKs, the Python Columnar SDK is developed from the ground-up for Columnar’s analytical use cases, and supports streaming APIs to handle large datasets.

      Don’t Mix Columnar & Operational SDKs.

      Do not combine the Python Columnar SDK with the Python Operational SDK on the same app server (or development machine). This combination is not tested and not supported. There may be problems with different versions of shared dependencies if you try this. This only applies to the Node.js and Python Columnar SDKs.

      Note, this does not apply to combining our Enterprise Analytics SDKs with our Operational SDKs. See the Analytics SDK page for a reminder of which Analytics SDK to use with which Analytics service.

      Before You Start

      Sign up for a Capella account, and choose a Columnar cluster.

      You’ll need to add your IP address to the allowlist, during the sign-up and cluster creation process (this can also be done at any time, via the UI, should the address change, or if you need to add a new one).

      Prerequisites

      Currently Python 3.9 - Python 3.12 is supported. See the compatibility page for more information about platform support.

      Getting the SDK

      The SDK can be installed via pip:

      python -m pip install couchbase-columnar

      For other installation methods, see the installation page.

      Connecting and Executing a Query

      Synchronous API

      from couchbase_columnar.cluster import Cluster
      from couchbase_columnar.credential import Credential
      from couchbase_columnar.options import (ClusterOptions,
                                              QueryOptions,
                                              SecurityOptions)
      
      
      def main() -> None:
          # Update this to your cluster
          connstr = 'couchbases://--your-instance--'
          username = 'username'
          pw = 'Password!123'
          # User Input ends here.
      
          cred = Credential.from_username_and_password(username, pw)
          cluster = Cluster.create_instance(connstr, cred)
      
          # Execute a query and buffer all result rows in client memory.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
          res = cluster.execute_query(statement)
          all_rows = res.get_all_rows()
          for row in all_rows:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a query and process rows as they arrive from server.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
          res = cluster.execute_query(statement)
          for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with positional arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
          res = cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
          for row in res:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with named arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
          res = cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
                                                                                'limit': 10}))
          for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
      
      if __name__ == '__main__':
          main()

      Asynchronous (asyncio) API

      from acouchbase_columnar import get_event_loop
      from acouchbase_columnar.cluster import AsyncCluster
      from couchbase_columnar.credential import Credential
      from couchbase_columnar.options import (ClusterOptions,
                                              QueryOptions,
                                              SecurityOptions)
      
      
      async def main() -> None:
          # Update this to your cluster
          connstr = 'couchbases://--your-instance--'
          username = 'username'
          pw = 'Password!123'
          # User Input ends here.
      
          cred = Credential.from_username_and_password(username, pw)
          cluster = AsyncCluster.create_instance(connstr, cred)
      
          # Execute a query and buffer all result rows in client memory.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
          res = await cluster.execute_query(statement)
          all_rows = await res.get_all_rows()
          # NOTE: all_rows is a list, _do not_ use `async for`
          for row in all_rows:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a query and process rows as they arrive from server.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
          res = await cluster.execute_query(statement)
          async for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with positional arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
          res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
          async for row in res:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with named arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
          res = await cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
                                                                                      'limit': 10}))
          async for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
      if __name__ == '__main__':
          loop = get_event_loop()
          loop.run_until_complete(main())