Skip to content

Async Usage Guide

dbsync-py provides full async support for high-performance applications. This guide covers async patterns, session management, and best practices.

Basic Async Setup

Creating Async Sessions

import asyncio
from dbsync.session import create_async_session
from dbsync.models import Block, Transaction

async def main():
    # Create async session
    async_session = create_async_session()

    try:
        # Use the session
        result = await async_session.execute(
            select(Block).order_by(Block.block_no.desc()).limit(1)
        )
        latest_block = result.scalar_one_or_none()

        if latest_block:
            print(f"Latest block: {latest_block.block_no}")

    finally:
        await async_session.close()

# Run the async function
asyncio.run(main())

Using Async Context Managers

from contextlib import asynccontextmanager
from dbsync.session import create_async_session

@asynccontextmanager
async def get_async_session():
    """Context manager for async database sessions."""
    session = create_async_session()
    try:
        yield session
    finally:
        await session.close()

async def query_latest_blocks():
    async with get_async_session() as session:
        result = await session.execute(
            select(Block).order_by(Block.block_no.desc()).limit(10)
        )
        blocks = result.scalars().all()
        return blocks

# Usage
blocks = await query_latest_blocks()
for block in blocks:
    print(f"Block {block.block_no}: {block.tx_count} transactions")

Common Async Query Patterns

Basic Entity Retrieval

from sqlalchemy import select
from dbsync.models import Transaction, Block

async def get_transaction_by_hash(session, tx_hash: str):
    """Get a transaction by its hash."""
    stmt = select(Transaction).where(
        Transaction.hash == bytes.fromhex(tx_hash)
    )
    result = await session.execute(stmt)
    return result.scalar_one_or_none()

async def get_block_transactions(session, block_id: int):
    """Get all transactions in a block."""
    stmt = select(Transaction).where(Transaction.block_id == block_id)
    result = await session.execute(stmt)
    return result.scalars().all()

# Usage
async with get_async_session() as session:
    tx = await get_transaction_by_hash(session, "a1b2c3d4...")
    if tx:
        transactions = await get_block_transactions(session, tx.block_id)
        print(f"Block has {len(transactions)} transactions")

Relationship Loading

from sqlalchemy.orm import selectinload, joinedload

async def get_block_with_transactions(session, block_no: int):
    """Get a block with all its transactions loaded."""
    stmt = select(Block).options(
        selectinload(Block.transactions)
    ).where(Block.block_no == block_no)

    result = await session.execute(stmt)
    block = result.scalar_one_or_none()

    if block:
        # Transactions are already loaded, no additional DB calls
        print(f"Block {block.block_no} has {len(block.transactions)} transactions")
        for tx in block.transactions:
            print(f"  Transaction: {tx.hash.hex()[:16]}...")

    return block

# Usage
async with get_async_session() as session:
    block = await get_block_with_transactions(session, 1000000)

Aggregation Queries

from sqlalchemy import func, and_

async def get_epoch_statistics(session, epoch_no: int):
    """Get comprehensive statistics for an epoch."""

    # Transaction statistics
    tx_stats_stmt = select(
        func.count(Transaction.id_).label('tx_count'),
        func.sum(Transaction.fee).label('total_fees'),
        func.avg(Transaction.fee).label('avg_fee'),
        func.sum(Transaction.size).label('total_size')
    ).select_from(
        Transaction.join(Block)
    ).where(Block.epoch_no == epoch_no)

    result = await session.execute(tx_stats_stmt)
    tx_stats = result.first()

    # Block statistics
    block_stats_stmt = select(
        func.count(Block.id_).label('block_count'),
        func.min(Block.time).label('epoch_start'),
        func.max(Block.time).label('epoch_end')
    ).where(Block.epoch_no == epoch_no)

    result = await session.execute(block_stats_stmt)
    block_stats = result.first()

    return {
        'epoch': epoch_no,
        'transactions': tx_stats.tx_count,
        'total_fees': tx_stats.total_fees,
        'average_fee': float(tx_stats.avg_fee) if tx_stats.avg_fee else 0,
        'total_size': tx_stats.total_size,
        'blocks': block_stats.block_count,
        'epoch_start': block_stats.epoch_start,
        'epoch_end': block_stats.epoch_end
    }

# Usage
async with get_async_session() as session:
    stats = await get_epoch_statistics(session, 123)
    print(f"Epoch {stats['epoch']}: {stats['transactions']} transactions in {stats['blocks']} blocks")

Concurrent Operations

Parallel Queries

import asyncio
from sqlalchemy import select

async def get_multiple_blocks(session, block_numbers: list[int]):
    """Get multiple blocks concurrently."""

    async def get_single_block(block_no: int):
        stmt = select(Block).where(Block.block_no == block_no)
        result = await session.execute(stmt)
        return result.scalar_one_or_none()

    # Execute all queries concurrently
    tasks = [get_single_block(block_no) for block_no in block_numbers]
    blocks = await asyncio.gather(*tasks)

    return [block for block in blocks if block is not None]

# Usage
async with get_async_session() as session:
    block_numbers = [1000000, 1000001, 1000002, 1000003, 1000004]
    blocks = await get_multiple_blocks(session, block_numbers)
    print(f"Retrieved {len(blocks)} blocks")

Concurrent Session Operations

async def process_epoch_range(start_epoch: int, end_epoch: int):
    """Process multiple epochs concurrently with separate sessions."""

    async def process_single_epoch(epoch_no: int):
        async with get_async_session() as session:
            stats = await get_epoch_statistics(session, epoch_no)
            return stats

    # Process epochs concurrently
    tasks = [
        process_single_epoch(epoch)
        for epoch in range(start_epoch, end_epoch + 1)
    ]

    results = await asyncio.gather(*tasks)
    return results

# Usage
epoch_stats = await process_epoch_range(100, 110)
for stats in epoch_stats:
    print(f"Epoch {stats['epoch']}: {stats['transactions']} transactions")

Streaming Large Datasets

Async Iteration

from sqlalchemy.ext.asyncio import AsyncSession

async def stream_transactions_in_epoch(session: AsyncSession, epoch_no: int, batch_size: int = 1000):
    """Stream transactions from an epoch in batches."""

    offset = 0
    while True:
        stmt = select(Transaction).join(Block).where(
            Block.epoch_no == epoch_no
        ).offset(offset).limit(batch_size)

        result = await session.execute(stmt)
        transactions = result.scalars().all()

        if not transactions:
            break

        # Yield this batch
        for tx in transactions:
            yield tx

        offset += batch_size

# Usage
async with get_async_session() as session:
    transaction_count = 0
    total_fees = 0

    async for tx in stream_transactions_in_epoch(session, 123):
        transaction_count += 1
        total_fees += tx.fee

        if transaction_count % 1000 == 0:
            print(f"Processed {transaction_count} transactions...")

    print(f"Total: {transaction_count} transactions, {total_fees} Lovelace in fees")

Async Generators for Complex Processing

async def analyze_address_activity(session: AsyncSession, address_bech32: str):
    """Analyze all activity for an address using async generator."""

    # Get address
    addr_stmt = select(Address).where(Address.view == address_bech32)
    result = await session.execute(addr_stmt)
    addr = result.scalar_one_or_none()

    if not addr:
        return

    # Stream all outputs to this address
    offset = 0
    batch_size = 1000

    while True:
        outputs_stmt = select(TransactionOutput).where(
            TransactionOutput.address_id == addr.id_
        ).offset(offset).limit(batch_size)

        result = await session.execute(outputs_stmt)
        outputs = result.scalars().all()

        if not outputs:
            break

        for output in outputs:
            # Check if this output is spent
            spent_stmt = select(TransactionInput).where(
                TransactionInput.tx_out_id == output.id_
            )
            spent_result = await session.execute(spent_stmt)
            spent = spent_result.scalar_one_or_none()

            yield {
                'output': output,
                'is_spent': spent is not None,
                'spending_tx': spent.tx_id if spent else None
            }

        offset += batch_size

# Usage
async with get_async_session() as session:
    utxo_count = 0
    total_value = 0

    async for activity in analyze_address_activity(session, "addr1..."):
        if not activity['is_spent']:
            utxo_count += 1
            total_value += activity['output'].value

    print(f"Address has {utxo_count} UTXOs worth {total_value} Lovelace")

Error Handling and Resilience

Async Retry Logic

import asyncio
from sqlalchemy.exc import OperationalError

async def execute_with_retry(async_func, max_retries: int = 3, delay: float = 1.0):
    """Execute an async function with retry logic."""

    for attempt in range(max_retries):
        try:
            return await async_func()
        except OperationalError as e:
            if attempt == max_retries - 1:
                raise e

            print(f"Database error on attempt {attempt + 1}, retrying in {delay}s...")
            await asyncio.sleep(delay)
            delay *= 2  # Exponential backoff

async def robust_query_example():
    """Example of robust async querying with retries."""

    async def query_function():
        async with get_async_session() as session:
            stmt = select(Block).order_by(Block.block_no.desc()).limit(1)
            result = await session.execute(stmt)
            return result.scalar_one()

    try:
        latest_block = await execute_with_retry(query_function)
        return latest_block
    except Exception as e:
        print(f"Failed to get latest block after retries: {e}")
        return None

Connection Pool Management

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import QueuePool

# Configure async engine with connection pooling
async_engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/dbsync",
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=30,
    pool_pre_ping=True,  # Validate connections
    pool_recycle=3600,   # Recycle connections after 1 hour
    echo=False  # Set to True for SQL debugging
)

async def get_pooled_session():
    """Get session from connection pool."""
    from sqlalchemy.ext.asyncio import AsyncSession

    return AsyncSession(async_engine)

# Usage with proper cleanup
async def main():
    session = get_pooled_session()
    try:
        # Your database operations
        stmt = select(Block).limit(10)
        result = await session.execute(stmt)
        blocks = result.scalars().all()

        for block in blocks:
            print(f"Block {block.block_no}")

    finally:
        await session.close()

# Clean up engine when done
await async_engine.dispose()

FastAPI Integration

Async Dependency Injection

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

async def get_async_db():
    """Dependency to provide async database session."""
    session = create_async_session()
    try:
        yield session
    finally:
        await session.close()

@app.get("/blocks/latest")
async def get_latest_blocks(
    limit: int = 10,
    db: AsyncSession = Depends(get_async_db)
):
    """Get latest blocks asynchronously."""
    stmt = select(Block).order_by(Block.block_no.desc()).limit(limit)
    result = await db.execute(stmt)
    blocks = result.scalars().all()

    return [
        {
            "block_no": block.block_no,
            "hash": block.hash.hex(),
            "time": block.time,
            "tx_count": block.tx_count
        }
        for block in blocks
    ]

@app.get("/transactions/{tx_hash}")
async def get_transaction(
    tx_hash: str,
    db: AsyncSession = Depends(get_async_db)
):
    """Get transaction by hash asynchronously."""
    try:
        stmt = select(Transaction).where(
            Transaction.hash == bytes.fromhex(tx_hash)
        )
        result = await db.execute(stmt)
        tx = result.scalar_one_or_none()

        if not tx:
            raise HTTPException(status_code=404, detail="Transaction not found")

        return {
            "hash": tx.hash.hex(),
            "fee": tx.fee,
            "size": tx.size,
            "block_id": tx.block_id
        }

    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid transaction hash format")

Performance Optimization

Async Batch Operations

async def batch_process_addresses(session: AsyncSession, addresses: list[str], batch_size: int = 100):
    """Process multiple addresses in batches."""

    results = []

    for i in range(0, len(addresses), batch_size):
        batch = addresses[i:i + batch_size]

        # Create queries for this batch
        tasks = []
        for address in batch:
            stmt = select(Address).where(Address.view == address)
            task = session.execute(stmt)
            tasks.append(task)

        # Execute batch concurrently
        batch_results = await asyncio.gather(*tasks)

        # Process results
        for result in batch_results:
            addr = result.scalar_one_or_none()
            if addr:
                results.append(addr)

    return results

# Usage
async with get_async_session() as session:
    addresses = ["addr1...", "addr2...", "addr3..."]  # Many addresses
    found_addresses = await batch_process_addresses(session, addresses)
    print(f"Found {len(found_addresses)} valid addresses")

Memory-Efficient Streaming

async def export_epoch_data(session: AsyncSession, epoch_no: int, output_file: str):
    """Export epoch data to file efficiently."""

    import json

    with open(output_file, 'w') as f:
        f.write('[\n')

        first_record = True
        offset = 0
        batch_size = 1000

        while True:
            stmt = select(Transaction).join(Block).where(
                Block.epoch_no == epoch_no
            ).offset(offset).limit(batch_size)

            result = await session.execute(stmt)
            transactions = result.scalars().all()

            if not transactions:
                break

            for tx in transactions:
                if not first_record:
                    f.write(',\n')

                tx_data = {
                    'hash': tx.hash.hex(),
                    'fee': tx.fee,
                    'size': tx.size
                }

                json.dump(tx_data, f)
                first_record = False

            offset += batch_size

            # Allow other tasks to run
            await asyncio.sleep(0)

        f.write('\n]')

# Usage
async with get_async_session() as session:
    await export_epoch_data(session, 123, 'epoch_123_transactions.json')
    print("Export completed")

This comprehensive async guide covers all major patterns for high-performance async operations with dbsync-py, from basic queries to complex concurrent processing scenarios.