Different methods for inserting data into the Cosmos DB NoSQL API using Python

In this blogpost, I will explain the different methods to insert data into the Azure Cosmos DB NoSQL API using Python code.

Azure Cosmos DB is a global distributed database created by Microsoft which supports multi-model databases like Gremlin, Cassandra, Table, PostgreSQL and Mongo DB hosted on Azure Cloud.

To insert data into Azure Cosmos DB NoSQL API, we have the below three methods to follow:

Table of Contents

Methods to insert data into Cosmos DB

  1. Synchronously creating items in Cosmos DB container.
  2. Asynchronously creating items in Cosmos DB container with async cosmos client.
  3. Transactional Batch API to ingest data with the help of a partition key.

The synchronous and asynchronous method samples are available at azure-sdk-for-python/sdk/cosmos/azure-cosmos/samples at main · Azure/azure-sdk-for-python · GitHub

Note: The above methods are available in other programming languages like Java and .NET. Python doesn’t support Bulk ingestion as of now.

Let us discuss the three methods and their pros and cons.

Synchronous method insertion

In synchronous method of insertion once authentication is successful with the Cosmos DB account, you will insert the JSON records into the container using a for loop.

Below is a Python reference:

from azure.cosmos import CosmosClient

# Initialize Cosmos client
url = "https://<your-cosmos-account>.documents.azure.com:443/"
key = "<your-primary-key>"
client = CosmosClient(url, credential=key)

# Get database and container
database_name = "SampleDB"
container_name = "SampleContainer"
database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Sample records to insert
items_to_insert = [
    {"id": "1", "name": "Alice", "city": "Hyderabad"},
    {"id": "2", "name": "Bob", "city": "Bangalore"},
    {"id": "3", "name": "Charlie", "city": "Chennai"}
]

# Insert records using create_item in a for loop
for item in items_to_insert:
    response = container.create_item(body=item)
    print(f"Inserted item with id: {response['id']}")

The loop will iterate one after another record till the end of the list in Python code. This process is a synchronous process and iterates n times if n records are there to insert into the container.

Pros:

The Python code is simple to setup.

Good for small datasets

Cons:

Consumes more time to insert many JSON records.

Not ideal for high throughput scenarios.

Asynchronous method insertion

Asynchronous Cosmos DB Client allows users to insert the data asynchronously into the container. For example, reads can be performed on a same container parallelly and allow you to fetch records much faster compared to synchronous operations.

import asyncio
from azure.cosmos.aio import CosmosClient

# Replace with your Cosmos DB account details
COSMOS_ENDPOINT = "https://<your-cosmos-account>.documents.azure.com:443/"
COSMOS_KEY = "<your-primary-key>"
DATABASE_NAME = "SampleDB"
CONTAINER_NAME = "SampleContainer"

# Sample records to insert
items_to_insert = [
    {"id": "1", "name": "Alice", "city": "Hyderabad"},
    {"id": "2", "name": "Bob", "city": "Bangalore"},
    {"id": "3", "name": "Charlie", "city": "Chennai"}
]

async def insert_items():
    # Initialize async Cosmos client
    async with CosmosClient(COSMOS_ENDPOINT, credential=COSMOS_KEY) as client:
        # Get database and container
        database = client.get_database_client(DATABASE_NAME)
        container = database.get_container_client(CONTAINER_NAME)

        # Insert items asynchronously
        tasks = []
        for item in items_to_insert:
            tasks.append(container.create_item(body=item))

        # Run all insertions concurrently
        results = await asyncio.gather(*tasks)

        # Print inserted IDs
        for res in results:
            print(f"Inserted item with id: {res['id']}")

# Run the async function
if __name__ == "__main__":
    asyncio.run(insert_items())

Pros:

Can fetch records faster due to asynchronous client.

Scalable (till few MBs only)

less time to download or upload the data compared to synchronous method.

Cons:

You need to understand the asynchronous functionality

Complex code structure

Higher RUs consumption.

Read more

Transactional Batch API method insertion

Cosmos DB for Python SDK provides a batch API which takes few JSON records and insert into the container. It can support at a time 10 records of insertion, 10 records of updating, 20 records deletion etc.

Read the functionality of Transactional Batch API of Cosmos DB at Transactional batch – Azure Cosmos DB REST API | Microsoft Learn

There are certain limitations to use the API:

The transaction batch should not exceed 100 JSON records.

The Batch size should not exceed 2MB.

The batch should consist records of same partition key only.

Pros:

Best when you have many records under one partition value. We can use a dictionary and sort the partition key for inserting the data into Cosmos DB container.

Fast compared to the above methods.

Cons:

If you have unique records in the input dataset, this follows the synchronous method again.

complex code setup.

RUs (Request Units) are consuming much faster in this method.

A sample code reference taken from Microsoft learn page (mentioned below),

batch_operations = [
        ("create", (item_body,), kwargs),
        ("replace", (item_id, item_body), kwargs),
        ("read", (item_id,), kwargs),
        ("upsert", (item_body,), kwargs),
        ("patch", (item_id, operations), kwargs),
        ("delete", (item_id,), kwargs),
    ]
batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key=partition_key)

To know more about Python SDK for Cosmos DB NoSQL API, refer Azure Cosmos DB SQL API client library for Python | Microsoft Learn

Takeaways

As discussed above, choose the best method depending on the data ingestion size and the insert the data into the container.

While working with Azure Cosmos DB, check out the monitoring page in Azure portal to track how many requests are being processed and whether any throttling has occurred at the container level.

Depending on the data ingestion, you can adjust the throughput up or down to optimize performance which keeping the costs under control.

Disclaimer: This content is human-written and reflects hours of manual effort. The included code was AI-generated for reference purpose. Please adapt it to your project requirements, as it may not function exactly as expected.

1 thought on “Different methods for inserting data into the Cosmos DB NoSQL API using Python”

Leave a Comment