Skip to content

Advanced Examples

This section provides comprehensive examples for advanced usage patterns and complex queries with dbsync-py.

Complex Multi-Table Queries

Stake Pool Analysis

from dbsync.session import create_session
from dbsync.models import (
    PoolHash, PoolRegistration, PoolRetirement, Delegation,
    Block, SlotLeader, Reward, StakeAddress
)
from sqlalchemy import func, and_, desc
from datetime import datetime, timedelta

def analyze_stake_pool(session, pool_bech32: str):
    """Comprehensive stake pool analysis."""

    # Get pool hash record
    pool = session.query(PoolHash).filter(
        PoolHash.view == pool_bech32
    ).first()

    if not pool:
        return {"error": "Pool not found"}

    # Get latest registration
    latest_registration = session.query(PoolRegistration).filter(
        PoolRegistration.hash_id == pool.id_
    ).order_by(desc(PoolRegistration.active_epoch_no)).first()

    # Get retirement (if any)
    retirement = session.query(PoolRetirement).filter(
        PoolRetirement.hash_id == pool.id_
    ).first()

    # Calculate current delegator count
    active_delegations = session.query(Delegation).filter(
        and_(
            Delegation.pool_hash_id == pool.id_,
            # Add logic to exclude superseded delegations
        )
    ).count()

    # Get blocks produced in last 10 epochs
    current_epoch = session.query(func.max(Block.epoch_no)).scalar()
    recent_blocks = session.query(func.count(Block.id_)).join(
        SlotLeader
    ).filter(
        and_(
            SlotLeader.pool_hash_id == pool.id_,
            Block.epoch_no >= current_epoch - 10
        )
    ).scalar()

    # Calculate total rewards distributed
    total_rewards = session.query(func.sum(Reward.amount)).join(
        StakeAddress
    ).join(Delegation).filter(
        Delegation.pool_hash_id == pool.id_
    ).scalar() or 0

    return {
        "pool_id": pool.view,
        "pool_name": latest_registration.meta_json.get("name") if latest_registration else None,
        "active_epoch": latest_registration.active_epoch_no if latest_registration else None,
        "retirement_epoch": retirement.retiring_epoch if retirement else None,
        "current_delegators": active_delegations,
        "blocks_last_10_epochs": recent_blocks,
        "total_rewards_distributed": total_rewards,
        "is_active": retirement is None or retirement.retiring_epoch > current_epoch
    }

# Usage
session = create_session()
pool_analysis = analyze_stake_pool(session, "pool1...")
print(f"Pool {pool_analysis['pool_name']} has {pool_analysis['current_delegators']} delegators")

Multi-Asset Transaction Analysis

from dbsync.models import (
    Transaction, TransactionOutput, MaTxOut, MaTxMint,
    MultiAsset, Block, Address
)

def analyze_native_token_transaction(session, tx_hash: str):
    """Analyze a transaction involving native tokens."""

    # Get the transaction
    tx = session.query(Transaction).filter(
        Transaction.hash == bytes.fromhex(tx_hash)
    ).first()

    if not tx:
        return {"error": "Transaction not found"}

    # Get all outputs with native assets
    asset_outputs = session.query(
        TransactionOutput, MaTxOut, MultiAsset
    ).join(MaTxOut).join(MultiAsset).filter(
        TransactionOutput.tx_id == tx.id_
    ).all()

    # Get all minting/burning in this transaction
    minting_burning = session.query(MaTxMint, MultiAsset).join(
        MultiAsset
    ).filter(MaTxMint.tx_id == tx.id_).all()

    # Analyze outputs by asset
    assets_transferred = {}
    for tx_out, ma_out, asset in asset_outputs:
        asset_id = f"{asset.policy.hex()}.{asset.name.hex()}"
        if asset_id not in assets_transferred:
            assets_transferred[asset_id] = {
                "policy_id": asset.policy.hex(),
                "asset_name": asset.name.hex(),
                "fingerprint": asset.fingerprint,
                "total_transferred": 0,
                "recipients": []
            }

        assets_transferred[asset_id]["total_transferred"] += ma_out.quantity
        assets_transferred[asset_id]["recipients"].append({
            "address_id": tx_out.address_id,
            "quantity": ma_out.quantity
        })

    # Analyze minting/burning
    minting_burning_summary = {}
    for mint, asset in minting_burning:
        asset_id = f"{asset.policy.hex()}.{asset.name.hex()}"
        minting_burning_summary[asset_id] = {
            "policy_id": asset.policy.hex(),
            "asset_name": asset.name.hex(),
            "fingerprint": asset.fingerprint,
            "quantity": mint.quantity,  # Positive = mint, Negative = burn
            "action": "mint" if mint.quantity > 0 else "burn"
        }

    return {
        "transaction_hash": tx_hash,
        "block_height": session.query(Block.block_no).filter(
            Block.id_ == tx.block_id
        ).scalar(),
        "assets_transferred": assets_transferred,
        "minting_burning": minting_burning_summary,
        "total_fee": tx.fee,
        "ada_outputs": session.query(func.sum(TransactionOutput.value)).filter(
            TransactionOutput.tx_id == tx.id_
        ).scalar()
    }

Governance Activity Analysis

from dbsync.models import (
    GovActionProposal, VotingProcedure, DrepRegistration,
    CommitteeRegistration, Transaction, Block
)

def analyze_governance_activity(session, epoch_range: tuple):
    """Analyze governance activity in a specific epoch range."""

    start_epoch, end_epoch = epoch_range

    # Get all governance proposals in the epoch range
    proposals = session.query(GovActionProposal).join(
        Transaction
    ).join(Block).filter(
        Block.epoch_no.between(start_epoch, end_epoch)
    ).all()

    # Analyze proposals by type
    proposal_types = {}
    for proposal in proposals:
        prop_type = proposal.type_
        if prop_type not in proposal_types:
            proposal_types[prop_type] = {
                "count": 0,
                "total_deposit": 0,
                "ratified": 0,
                "expired": 0,
                "dropped": 0
            }

        proposal_types[prop_type]["count"] += 1
        proposal_types[prop_type]["total_deposit"] += proposal.deposit

        if proposal.ratified_epoch:
            proposal_types[prop_type]["ratified"] += 1
        elif proposal.expired_epoch:
            proposal_types[prop_type]["expired"] += 1
        elif proposal.dropped_epoch:
            proposal_types[prop_type]["dropped"] += 1

    # Get voting activity
    votes = session.query(VotingProcedure).join(
        Transaction
    ).join(Block).filter(
        Block.epoch_no.between(start_epoch, end_epoch)
    ).all()

    # Analyze votes by role and outcome
    voting_summary = {
        "total_votes": len(votes),
        "by_role": {},
        "by_outcome": {"Yes": 0, "No": 0, "Abstain": 0}
    }

    for vote in votes:
        # Count by voter role
        if vote.committee_voter:
            role = "Committee"
        elif vote.drep_voter:
            role = "DRep"
        elif vote.pool_voter:
            role = "Pool"
        else:
            role = "Unknown"

        if role not in voting_summary["by_role"]:
            voting_summary["by_role"][role] = 0
        voting_summary["by_role"][role] += 1

        # Count by outcome
        voting_summary["by_outcome"][vote.vote] += 1

    # Get new DRep registrations
    new_dreps = session.query(DrepRegistration).join(
        Transaction
    ).join(Block).filter(
        Block.epoch_no.between(start_epoch, end_epoch)
    ).count()

    # Get new committee registrations
    new_committee = session.query(CommitteeRegistration).join(
        Transaction
    ).join(Block).filter(
        Block.epoch_no.between(start_epoch, end_epoch)
    ).count()

    return {
        "epoch_range": epoch_range,
        "governance_proposals": {
            "total_count": len(proposals),
            "by_type": proposal_types
        },
        "voting_activity": voting_summary,
        "new_registrations": {
            "dreps": new_dreps,
            "committee_members": new_committee
        }
    }

Performance Optimization Examples

Efficient Large Dataset Processing

from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy import text

def process_large_transaction_dataset(session, start_epoch: int, end_epoch: int):
    """Efficiently process large transaction datasets."""

    # Use raw SQL for initial filtering to improve performance
    tx_ids_query = text("""
        SELECT t.id
        FROM tx t
        JOIN block b ON t.block_id = b.id
        WHERE b.epoch_no BETWEEN :start_epoch AND :end_epoch
        ORDER BY b.block_no, t.block_index
    """)

    # Execute in batches to manage memory
    batch_size = 1000
    offset = 0

    while True:
        # Get batch of transaction IDs
        tx_ids_result = session.execute(
            tx_ids_query.params(
                start_epoch=start_epoch,
                end_epoch=end_epoch
            ).limit(batch_size).offset(offset)
        ).fetchall()

        if not tx_ids_result:
            break

        tx_ids = [row[0] for row in tx_ids_result]

        # Load transactions with eager loading for related data
        transactions = session.query(Transaction).options(
            joinedload(Transaction.block),
            selectinload(Transaction.inputs),
            selectinload(Transaction.outputs),
            selectinload(Transaction.metadata)
        ).filter(Transaction.id_.in_(tx_ids)).all()

        # Process this batch
        for tx in transactions:
            process_transaction(tx)

        offset += batch_size

        # Clear session to prevent memory buildup
        session.expunge_all()

def process_transaction(tx):
    """Process individual transaction."""
    # Your transaction processing logic here
    print(f"Processing tx {tx.hash.hex()[:16]}... in block {tx.block.block_no}")

Optimized Aggregation Queries

def calculate_epoch_statistics(session, epoch_no: int):
    """Calculate comprehensive epoch statistics efficiently."""

    # Use single query with multiple aggregations
    stats_query = session.query(
        func.count(Transaction.id_).label('tx_count'),
        func.sum(Transaction.fee).label('total_fees'),
        func.avg(Transaction.fee).label('avg_fee'),
        func.sum(Transaction.size).label('total_size'),
        func.count(func.distinct(Transaction.block_id)).label('block_count'),
        func.min(Block.time).label('epoch_start'),
        func.max(Block.time).label('epoch_end')
    ).join(Block).filter(Block.epoch_no == epoch_no)

    result = stats_query.first()

    # Calculate additional metrics with separate optimized queries
    # UTXO changes
    utxo_created = session.query(func.count(TransactionOutput.id_)).join(
        Transaction
    ).join(Block).filter(Block.epoch_no == epoch_no).scalar()

    utxo_consumed = session.query(func.count(TransactionInput.id_)).join(
        Transaction
    ).join(Block).filter(Block.epoch_no == epoch_no).scalar()

    # Staking activity
    new_delegations = session.query(func.count(Delegation.id_)).filter(
        Delegation.active_epoch_no == epoch_no
    ).scalar()

    return {
        "epoch": epoch_no,
        "transactions": result.tx_count,
        "total_fees_lovelace": result.total_fees,
        "average_fee_lovelace": float(result.avg_fee) if result.avg_fee else 0,
        "total_size_bytes": result.total_size,
        "blocks": result.block_count,
        "epoch_start": result.epoch_start,
        "epoch_end": result.epoch_end,
        "utxo_created": utxo_created,
        "utxo_consumed": utxo_consumed,
        "net_utxo_change": utxo_created - utxo_consumed,
        "new_delegations": new_delegations
    }

Integration Examples

PyCardano Integration

from pycardano import (
    TransactionBuilder, TransactionOutput, Address,
    PlutusScript, ScriptHash, Redeemer
)
from dbsync.models import Script, Datum, RedeemerData

def build_transaction_from_db_script(session, script_hash: str):
    """Build a PyCardano transaction using script data from database."""

    # Get script from database
    script = session.query(Script).filter(
        Script.hash_ == bytes.fromhex(script_hash)
    ).first()

    if not script:
        raise ValueError(f"Script not found: {script_hash}")

    # Convert database script to PyCardano script
    if script.type_ == "plutusv1":
        plutus_script = PlutusScript(script.bytes_)
    elif script.type_ == "plutusv2":
        plutus_script = PlutusScript(script.bytes_, language_version=2)
    else:
        raise ValueError(f"Unsupported script type: {script.type_}")

    # Get recent successful redeemer for reference
    recent_redeemer = session.query(RedeemerData).join(
        Redeemer
    ).filter(Redeemer.script_hash == script.hash_).order_by(
        Redeemer.id_.desc()
    ).first()

    # Build transaction (example structure)
    builder = TransactionBuilder()

    # Add script reference
    script_hash = ScriptHash.from_primitive(script.hash_)

    return {
        "script": plutus_script,
        "script_hash": script_hash,
        "recent_redeemer_data": recent_redeemer.data if recent_redeemer else None,
        "builder": builder
    }

Custom API Integration

from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from dbsync.session import create_session

app = FastAPI(title="Cardano Analytics API")

def get_db():
    """Dependency to get database session."""
    session = create_session()
    try:
        yield session
    finally:
        session.close()

@app.get("/api/v1/address/{address}/balance")
def get_address_balance(address: str, db: Session = Depends(get_db)):
    """Get current balance for an address."""

    try:
        # Get address record
        addr_record = db.query(Address).filter(
            Address.view == address
        ).first()

        if not addr_record:
            raise HTTPException(status_code=404, detail="Address not found")

        # Calculate UTXO balance
        unspent_outputs = db.query(
            func.sum(TransactionOutput.value).label('total_ada'),
            func.count(TransactionOutput.id_).label('utxo_count')
        ).filter(
            and_(
                TransactionOutput.address_id == addr_record.id_,
                ~TransactionOutput.id_.in_(
                    db.query(TransactionInput.tx_out_id).distinct()
                )
            )
        ).first()

        # Get native assets
        native_assets = db.query(
            MultiAsset.policy,
            MultiAsset.name,
            MultiAsset.fingerprint,
            func.sum(MaTxOut.quantity).label('total_quantity')
        ).join(MaTxOut).join(TransactionOutput).filter(
            and_(
                TransactionOutput.address_id == addr_record.id_,
                ~TransactionOutput.id_.in_(
                    db.query(TransactionInput.tx_out_id).distinct()
                )
            )
        ).group_by(
            MultiAsset.policy, MultiAsset.name, MultiAsset.fingerprint
        ).all()

        return {
            "address": address,
            "ada_balance": unspent_outputs.total_ada or 0,
            "utxo_count": unspent_outputs.utxo_count or 0,
            "native_assets": [
                {
                    "policy_id": asset.policy.hex(),
                    "asset_name": asset.name.hex(),
                    "fingerprint": asset.fingerprint,
                    "quantity": asset.total_quantity
                }
                for asset in native_assets
            ]
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/v1/pool/{pool_id}/delegators")
def get_pool_delegators(pool_id: str, db: Session = Depends(get_db)):
    """Get current delegators for a stake pool."""

    # Get pool
    pool = db.query(PoolHash).filter(PoolHash.view == pool_id).first()
    if not pool:
        raise HTTPException(status_code=404, detail="Pool not found")

    # Get current delegations (simplified - would need more complex logic for real implementation)
    delegators = db.query(
        StakeAddress.view,
        func.max(Delegation.active_epoch_no).label('latest_epoch')
    ).join(Delegation).filter(
        Delegation.pool_hash_id == pool.id_
    ).group_by(StakeAddress.view).having(
        func.max(Delegation.active_epoch_no) == db.query(
            func.max(Delegation.active_epoch_no)
        ).filter(Delegation.addr_id == StakeAddress.id_).scalar_subquery()
    ).all()

    return {
        "pool_id": pool_id,
        "delegator_count": len(delegators),
        "delegators": [
            {
                "stake_address": delegator.view,
                "active_since_epoch": delegator.latest_epoch
            }
            for delegator in delegators
        ]
    }

Error Handling and Resilience

Robust Query Patterns

from sqlalchemy.exc import OperationalError, DatabaseError
from typing import Optional, List
import time

class RobustQueryExecutor:
    """Resilient query execution with retries and error handling."""

    def __init__(self, session, max_retries: int = 3, retry_delay: float = 1.0):
        self.session = session
        self.max_retries = max_retries
        self.retry_delay = retry_delay

    def execute_with_retry(self, query_func, *args, **kwargs):
        """Execute a query function with automatic retries."""

        last_exception = None

        for attempt in range(self.max_retries + 1):
            try:
                return query_func(*args, **kwargs)

            except (OperationalError, DatabaseError) as e:
                last_exception = e

                if attempt < self.max_retries:
                    print(f"Query failed (attempt {attempt + 1}), retrying in {self.retry_delay}s...")
                    time.sleep(self.retry_delay)

                    # Try to rollback and reconnect
                    try:
                        self.session.rollback()
                    except:
                        pass
                else:
                    print(f"Query failed after {self.max_retries + 1} attempts")
                    raise last_exception

    def safe_get_transaction(self, tx_hash: str) -> Optional[Transaction]:
        """Safely retrieve a transaction with error handling."""

        def _query():
            return self.session.query(Transaction).filter(
                Transaction.hash == bytes.fromhex(tx_hash)
            ).first()

        try:
            return self.execute_with_retry(_query)
        except Exception as e:
            print(f"Failed to retrieve transaction {tx_hash}: {e}")
            return None

    def safe_get_address_utxos(self, address: str) -> List[TransactionOutput]:
        """Safely get UTXOs for an address."""

        def _query():
            addr = self.session.query(Address).filter(
                Address.view == address
            ).first()

            if not addr:
                return []

            return self.session.query(TransactionOutput).filter(
                and_(
                    TransactionOutput.address_id == addr.id_,
                    ~TransactionOutput.id_.in_(
                        self.session.query(TransactionInput.tx_out_id).distinct()
                    )
                )
            ).all()

        try:
            return self.execute_with_retry(_query)
        except Exception as e:
            print(f"Failed to retrieve UTXOs for address {address}: {e}")
            return []

# Usage
session = create_session()
executor = RobustQueryExecutor(session)

# Safe transaction retrieval
tx = executor.safe_get_transaction("a1b2c3...")
if tx:
    print(f"Transaction found: {tx.hash.hex()}")
else:
    print("Transaction not found or query failed")

This comprehensive documentation provides advanced examples covering complex multi-table queries, performance optimization, integration patterns, and robust error handling - essential for professional use of dbsync-py.