February 16, 2025
+ 12
Install, connect, try. A quick start guide to get you up and running with Columnar and the Python Columnar SDK.

Capella Columnar is a real-time analytical database (RT-OLAP) for real time apps and operational intelligence. Although maintaining some syntactic similarities with the operational SDKs, the Python Columnar SDK is developed from the ground-up for Columnar’s analytical use cases, and supports streaming APIs to handle large datasets.

Before You Start

Sign up for a Capella account, and choose a Columnar cluster.

You’ll need to add your IP address to the allowlist, during the sign-up and cluster creation process (this can also be done at any time, via the UI, should the address change, or if you need to add a new one).

Prerequisites

Currently Python 3.9 - Python 3.12 is supported. See the compatibility page for more information about platform support.

Getting the SDK

The SDK can be installed via pip:

console
python -m pip install couchbase-columnar

For other installation methods, see the installation page.

Connecting and Executing a Query

Synchronous API

python
from couchbase_columnar.cluster import Cluster from couchbase_columnar.credential import Credential from couchbase_columnar.options import (ClusterOptions, QueryOptions, SecurityOptions) def main() -> None: # Update this to your cluster connstr = 'couchbases://--your-instance--' username = 'username' pw = 'Password!123' # User Input ends here. cred = Credential.from_username_and_password(username, pw) cluster = Cluster.create_instance(connstr, cred) # Execute a query and buffer all result rows in client memory. statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;' res = cluster.execute_query(statement) all_rows = res.get_all_rows() for row in all_rows: print(f'Found row: {row}') print(f'metadata={res.metadata()}') # Execute a query and process rows as they arrive from server. statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;' res = cluster.execute_query(statement) for row in res.rows(): print(f'Found row: {row}') print(f'metadata={res.metadata()}') # Execute a streaming query with positional arguments. statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;' res = cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10])) for row in res: print(f'Found row: {row}') print(f'metadata={res.metadata()}') # Execute a streaming query with named arguments. statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;' res = cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States', 'limit': 10})) for row in res.rows(): print(f'Found row: {row}') print(f'metadata={res.metadata()}') if __name__ == '__main__': main()

Asynchronous (asyncio) API

python
from acouchbase_columnar import get_event_loop from acouchbase_columnar.cluster import AsyncCluster from couchbase_columnar.credential import Credential from couchbase_columnar.options import (ClusterOptions, QueryOptions, SecurityOptions) async def main() -> None: # Update this to your cluster connstr = 'couchbases://--your-instance--' username = 'username' pw = 'Password!123' # User Input ends here. cred = Credential.from_username_and_password(username, pw) cluster = AsyncCluster.create_instance(connstr, cred) # Execute a query and buffer all result rows in client memory. statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;' res = await cluster.execute_query(statement) all_rows = await res.get_all_rows() # NOTE: all_rows is a list, _do not_ use `async for` for row in all_rows: print(f'Found row: {row}') print(f'metadata={res.metadata()}') # Execute a query and process rows as they arrive from server. statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;' res = await cluster.execute_query(statement) async for row in res.rows(): print(f'Found row: {row}') print(f'metadata={res.metadata()}') # Execute a streaming query with positional arguments. statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;' res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10])) async for row in res: print(f'Found row: {row}') print(f'metadata={res.metadata()}') # Execute a streaming query with named arguments. statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;' res = await cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States', 'limit': 10})) async for row in res.rows(): print(f'Found row: {row}') print(f'metadata={res.metadata()}') if __name__ == '__main__': loop = get_event_loop() loop.run_until_complete(main())