Skip to content

Query API Reference

This section documents query utilities, common patterns, and helper functions for working with dbsync-py models.

Query Utilities

dbsync.examples.queries

Query pattern examples for dbsync-py.

This package contains example implementations of common query patterns using the dbsync-py package models. These examples demonstrate how to convert SQL queries from the Cardano DB Sync documentation into SQLAlchemy implementations.

Available query examples: - chain_metadata: Chain fundamentals and metadata queries - transaction_analysis: Transaction analysis and UTxO operations - pool_management: Pool management and block production - staking_delegation: Staking and delegation pattern queries - smart_contracts: Smart contracts and scripts analysis - governance: Conway era governance utilities - multi_asset: Multi-asset and token operations

These are educational examples showing best practices for: - Using dbsync-py models - Building SQLAlchemy queries - Handling edge cases and errors - Creating reusable query patterns

ChainMetadataQueries

Example chain metadata and fundamental blockchain data queries.

This class demonstrates SQLAlchemy implementations of common chain metadata queries, including supply calculations, sync progress, and basic chain info.

These are example implementations showing how to use the dbsync-py package models to build useful queries.

get_chain_metadata(session) staticmethod

Get chain metadata information.

SQL equivalent

SELECT * FROM meta;

Returns:

Type Description
ChainMeta | None

ChainMeta object with network information, or None if not found

Example

from dbsync.session import get_session with get_session() as session: ... meta = ChainMetadataQueries.get_chain_metadata(session) ... print(f"Network: {meta.network_name}") ... print(f"Start time: {meta.start_time}")

get_current_supply(session) staticmethod

Calculate the current total on-chain supply of Ada.

Note: 1 ADA == 1,000,000 Lovelace

This queries the UTxO set for unspent transaction outputs. It does not include staking rewards that have not yet been withdrawn. Before being withdrawn, rewards exist in ledger state and not on-chain.

SQL equivalent

SELECT sum(value) FROM tx_out AS tx_outer WHERE NOT EXISTS ( SELECT tx_out.id FROM tx_out INNER JOIN tx_in ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index WHERE tx_outer.id = tx_out.id );

Returns:

Type Description
int

Total supply in Lovelace

Example

from dbsync.session import get_session with get_session() as session: ... supply = ChainMetadataQueries.get_current_supply(session) ... print(f"Current supply: {supply / 1_000_000:.2f} ADA")

get_database_size_pretty(session) staticmethod

Get the human-readable size of the database.

SQL equivalent

SELECT pg_size_pretty(pg_database_size(current_database()));

Returns:

Type Description
str

Human-readable database size (e.g., "116 GB")

Example

from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_database_size_pretty(session) ... print(f"Database size: {size}")

get_latest_slot_number(session) staticmethod

Get the slot number of the most recent block.

SQL equivalent

SELECT slot_no FROM block WHERE block_no IS NOT NULL ORDER BY block_no DESC LIMIT 1;

Returns:

Type Description
int | None

Latest slot number, or None if no blocks found

Example

from dbsync.session import get_session with get_session() as session: ... slot = ChainMetadataQueries.get_latest_slot_number(session) ... print(f"Latest slot: {slot}")

get_sync_behind_duration(session) staticmethod

Get how far behind the sync is from current time.

SQL equivalent

SELECT now() - max(time) AS behind_by FROM block;

Returns:

Type Description
str | None

Time duration string (e.g., "4 days 20:59:39.134497") or None

Example

from dbsync.session import get_session with get_session() as session: ... behind = ChainMetadataQueries.get_sync_behind_duration(session) ... print(f"Sync is behind by: {behind}")

get_sync_progress_percent(session) staticmethod

Get rough estimate of sync progress as a percentage.

To get a rough estimate of how close to fully synced the database is, we use the timestamps on the blocks.

Note: This value can be misleading as it operates on block timestamps and early epochs contain much less data (e.g., Byron era did not have staking) and much fewer transactions.

SQL equivalent

SELECT 100 * ( extract(epoch from (max(time) at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) / ( extract(epoch from (now() at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) AS sync_percent FROM block;

Returns:

Type Description
float

Sync progress percentage (0.0 to 100.0)

Example

from dbsync.session import get_session with get_session() as session: ... progress = ChainMetadataQueries.get_sync_progress_percent(session) ... print(f"Sync progress: {progress:.2f}%")

get_table_size_pretty(session, table_name='block') staticmethod

Get the human-readable size of a specific database table.

SQL equivalent

SELECT pg_size_pretty(pg_total_relation_size('block'));

Parameters:

Name Type Description Default
table_name str

Name of the table to check (default: 'block')

'block'

Returns:

Type Description
str

Human-readable table size (e.g., "2760 MB")

Example

from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_table_size_pretty(session, "tx_out") ... print(f"tx_out table size: {size}")

GovernanceQueries

Example Conway era governance query utilities.

get_committee_operations_tracking(session, committee_member=None, limit=20) staticmethod

Monitor constitutional committee activities and decisions.

get_drep_activity_monitoring(session, drep_id=None, limit=20) staticmethod

Track DRep registrations, delegations, and voting activity.

get_governance_proposal_analysis(session, proposal_id=None, limit=20) staticmethod

Analyze governance action proposals and their lifecycle.

get_treasury_governance_analysis(session, days=90, limit=20) staticmethod

Track treasury operations and governance spending.

get_voting_participation_metrics(session, days=30, limit=20) staticmethod

Analyze voting participation rates and outcomes.

MultiAssetQueries

Example multi-asset and token operation queries.

get_asset_metadata_tracking(session, policy_id=None, limit=20) staticmethod

Track native asset metadata and policy information.

get_token_portfolio_analysis(session, address=None, limit=50) staticmethod

Analyze token holdings and distributions for addresses or network-wide.

get_token_transfer_patterns(session, days=30, limit=20) staticmethod

Monitor token transfer activity and volume patterns.

PoolManagementQueries

Example pool management and block production queries.

get_pool_block_production_stats(session, pool_id, epochs=10) staticmethod

Get block production statistics for a pool over recent epochs.

get_pool_delegation_summary(session, pool_id, limit=100) staticmethod

Get current delegation summary for a pool.

get_pool_operational_status(session, pool_id) staticmethod

Get current operational status and configuration for a pool.

get_pool_performance_metrics(session, pool_id, epoch_no=None) staticmethod

Get detailed performance metrics for a pool in a specific epoch.

get_pool_registration_info(session, pool_id) staticmethod

Get comprehensive pool registration and metadata information.

get_pool_rewards_analysis(session, pool_id, epochs=5) staticmethod

Get reward distribution analysis for a pool over recent epochs.

SmartContractsQueries

Example smart contracts and scripts queries.

get_contract_usage_patterns(session, days=30, limit=20) staticmethod

Track smart contract execution patterns over time.

get_contract_value_tracking(session, epoch_no=None, limit=20) staticmethod

Track value locked in smart contracts.

get_script_analysis(session, script_hash=None, limit=50) staticmethod

Analyze native scripts and Plutus scripts usage.

get_script_hash_tracking(session, script_hash) staticmethod

Monitor specific script hash usage and redeemer details.

StakingDelegationQueries

Example staking and delegation pattern queries.

get_active_stake_monitoring(session, epoch_no=None) staticmethod

Monitor current active stake status and metrics.

get_delegation_history(session, stake_address, limit=50) staticmethod

Get delegation history for a specific stake address.

get_delegation_lifecycle(session, stake_address) staticmethod

Track complete delegation lifecycle for a stake address.

get_reward_earning_patterns(session, stake_address, epochs=10) staticmethod

Analyze reward earning patterns for a stake address.

get_stake_distribution_patterns(session, epoch_no=None, limit=20) staticmethod

Analyze stake distribution patterns across pools.

TransactionAnalysisQueries

Example transaction analysis and UTxO operation queries.

This class demonstrates SQLAlchemy implementations of common transaction analysis patterns, including fee analysis, UTxO calculations, and address transaction history.

These are example implementations showing how to use the dbsync-py package models to build useful transaction analysis queries.

get_address_balance(session, address) staticmethod

Calculate current balance for a given address using UTxO method.

This calculates the balance by finding all unspent transaction outputs (UTxOs) for the address and summing their values.

SQL equivalent

SELECT addr.address, COUNT(utxo.id) as utxo_count, SUM(utxo.value) as total_balance FROM address addr INNER JOIN tx_out utxo ON addr.id = utxo.address_id LEFT JOIN tx_in spent ON utxo.id = spent.tx_out_id AND utxo.index = spent.tx_out_index WHERE addr.address = %s AND spent.id IS NULL GROUP BY addr.address;

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
address str

Cardano address to check

required

Returns:

Type Description
dict[str, Any]

Dictionary with balance information

Example

with get_session() as session: ... balance = TransactionAnalysisQueries.get_address_balance( ... session, "addr1..." ... ) ... print(f"Balance: {balance['balance_ada']:.2f} ADA")

get_address_transaction_history(session, address, limit=10) staticmethod

Get recent transaction history for an address.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
address str

Address to analyze

required
limit int

Maximum number of transactions to return

10

Returns:

Type Description
dict[str, Any]

Dictionary with transaction history

Example

with get_session() as session: ... history = TransactionAnalysisQueries.get_address_transaction_history( ... session, "addr1...", limit=5 ... ) ... print(f"Recent transactions: {len(history['transactions'])}")

get_hourly_transaction_throughput(session, hours=24) staticmethod

Get transaction throughput statistics by hour.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
hours int

Number of hours to analyze

24

Returns:

Type Description
dict[str, Any]

Dictionary with hourly throughput data

Example

with get_session() as session: ... throughput = TransactionAnalysisQueries.get_hourly_transaction_throughput(session) ... print(f"Peak hour: {throughput['peak_hour_transactions']} transactions")

get_large_transactions(session, min_ada=1000.0, limit=10) staticmethod

Get transactions with large ADA amounts.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
min_ada float

Minimum ADA amount to consider "large"

1000.0
limit int

Maximum number of transactions to return

10

Returns:

Type Description
dict[str, Any]

Dictionary with large transaction data

Example

with get_session() as session: ... large_txs = TransactionAnalysisQueries.get_large_transactions( ... session, min_ada=10000.0 ... ) ... print(f"Found {len(large_txs['transactions'])} large transactions")

get_transaction_fee_stats(session, days=7) staticmethod

Get transaction fee statistics for the specified time period.

SQL equivalent

SELECT COUNT(*) as tx_count, AVG(fee) as avg_fee, MIN(fee) as min_fee, MAX(fee) as max_fee, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY fee) as median_fee, SUM(fee) as total_fees FROM tx INNER JOIN block ON tx.block_id = block.id WHERE block.time > NOW() - INTERVAL '%s days';

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
days int

Number of days to analyze (default: 7)

7

Returns:

Type Description
dict[str, Any]

Dictionary with fee statistics

Example

from dbsync.session import get_session with get_session() as session: ... stats = TransactionAnalysisQueries.get_transaction_fee_stats(session) ... print(f"Average fee: {stats['avg_fee'] / 1_000_000:.2f} ADA")

get_transaction_inputs_outputs(session, tx_hash) staticmethod

Get detailed input and output information for a transaction.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
tx_hash str

Transaction hash to analyze

required

Returns:

Type Description
dict[str, Any]

Dictionary with input and output details

Example

with get_session() as session: ... details = TransactionAnalysisQueries.get_transaction_inputs_outputs( ... session, "abc123..." ... ) ... print(f"Inputs: {len(details['inputs'])}, Outputs: {len(details['outputs'])}")

get_transaction_size_distribution(session, days=7) staticmethod

Get transaction size distribution statistics.

This analyzes transaction sizes by counting inputs and outputs.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
days int

Number of days to analyze

7

Returns:

Type Description
dict[str, Any]

Dictionary with size distribution data

Example

with get_session() as session: ... distribution = TransactionAnalysisQueries.get_transaction_size_distribution(session) ... print(f"Average inputs: {distribution['avg_inputs']:.2f}")

get_chain_info(session)

Get comprehensive chain information in a single call.

Returns a dictionary with all basic chain metadata including: - Chain metadata (network, start time) - Current supply in ADA and Lovelace - Latest slot number - Database size information - Sync progress information

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required

Returns:

Type Description
dict[str, Any]

Dictionary containing all chain information

Example

from dbsync.session import get_session with get_session() as session: ... info = get_chain_info(session) ... print(f"Network: {info['network']}") ... print(f"Supply: {info['supply_ada']:.2f} ADA") ... print(f"Latest slot: {info['latest_slot']}")

get_comprehensive_governance_analysis(session, proposal_id=None, drep_id=None, committee_member=None, days=30)

Get comprehensive Conway era governance analysis in a single call.

get_comprehensive_multi_asset_analysis(session, policy_id=None, days=30)

Get comprehensive multi-asset analysis in a single call.

get_comprehensive_pool_analysis(session, pool_id, epochs=5)

Get comprehensive pool analysis in a single call.

get_comprehensive_smart_contract_analysis(session, script_hash=None, days=30)

Get comprehensive smart contract analysis in a single call.

get_comprehensive_staking_analysis(session, stake_address, epochs=5)

Get comprehensive staking analysis in a single call.

get_comprehensive_transaction_analysis(session, days=7)

Get comprehensive transaction analysis in a single call.

Returns a dictionary with various transaction analysis metrics including: - Fee statistics - Transaction throughput - Size distribution - Large transaction analysis

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
days int

Number of days to analyze

7

Returns:

Type Description
dict[str, Any]

Dictionary containing comprehensive analysis

Example

from dbsync.session import get_session with get_session() as session: ... analysis = get_comprehensive_transaction_analysis(session) ... print(f"Total transactions: {analysis['fee_stats']['tx_count']:,}") ... print(f"Average fee: {analysis['fee_stats']['avg_fee'] / 1_000_000:.4f} ADA")

chain_metadata

Chain Metadata Queries - Example Implementation.

This example demonstrates how to use the dbsync-py package to implement common chain metadata and fundamental blockchain data queries.

This file converts SQL examples from the Cardano DB Sync documentation to SQLAlchemy implementations using the dbsync-py models.

Based on SQL examples from: https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/interesting-queries.md

Usage

from dbsync.examples.queries.chain_metadata import ChainMetadataQueries, get_chain_info from dbsync.session import get_session

with get_session() as session: info = get_chain_info(session) print(f"Current supply: {info['supply_ada']:.2f} ADA")

ChainMetadataQueries

Example chain metadata and fundamental blockchain data queries.

This class demonstrates SQLAlchemy implementations of common chain metadata queries, including supply calculations, sync progress, and basic chain info.

These are example implementations showing how to use the dbsync-py package models to build useful queries.

get_chain_metadata(session) staticmethod

Get chain metadata information.

SQL equivalent

SELECT * FROM meta;

Returns:

Type Description
ChainMeta | None

ChainMeta object with network information, or None if not found

Example

from dbsync.session import get_session with get_session() as session: ... meta = ChainMetadataQueries.get_chain_metadata(session) ... print(f"Network: {meta.network_name}") ... print(f"Start time: {meta.start_time}")

get_current_supply(session) staticmethod

Calculate the current total on-chain supply of Ada.

Note: 1 ADA == 1,000,000 Lovelace

This queries the UTxO set for unspent transaction outputs. It does not include staking rewards that have not yet been withdrawn. Before being withdrawn, rewards exist in ledger state and not on-chain.

SQL equivalent

SELECT sum(value) FROM tx_out AS tx_outer WHERE NOT EXISTS ( SELECT tx_out.id FROM tx_out INNER JOIN tx_in ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index WHERE tx_outer.id = tx_out.id );

Returns:

Type Description
int

Total supply in Lovelace

Example

from dbsync.session import get_session with get_session() as session: ... supply = ChainMetadataQueries.get_current_supply(session) ... print(f"Current supply: {supply / 1_000_000:.2f} ADA")

get_database_size_pretty(session) staticmethod

Get the human-readable size of the database.

SQL equivalent

SELECT pg_size_pretty(pg_database_size(current_database()));

Returns:

Type Description
str

Human-readable database size (e.g., "116 GB")

Example

from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_database_size_pretty(session) ... print(f"Database size: {size}")

get_latest_slot_number(session) staticmethod

Get the slot number of the most recent block.

SQL equivalent

SELECT slot_no FROM block WHERE block_no IS NOT NULL ORDER BY block_no DESC LIMIT 1;

Returns:

Type Description
int | None

Latest slot number, or None if no blocks found

Example

from dbsync.session import get_session with get_session() as session: ... slot = ChainMetadataQueries.get_latest_slot_number(session) ... print(f"Latest slot: {slot}")

get_sync_behind_duration(session) staticmethod

Get how far behind the sync is from current time.

SQL equivalent

SELECT now() - max(time) AS behind_by FROM block;

Returns:

Type Description
str | None

Time duration string (e.g., "4 days 20:59:39.134497") or None

Example

from dbsync.session import get_session with get_session() as session: ... behind = ChainMetadataQueries.get_sync_behind_duration(session) ... print(f"Sync is behind by: {behind}")

get_sync_progress_percent(session) staticmethod

Get rough estimate of sync progress as a percentage.

To get a rough estimate of how close to fully synced the database is, we use the timestamps on the blocks.

Note: This value can be misleading as it operates on block timestamps and early epochs contain much less data (e.g., Byron era did not have staking) and much fewer transactions.

SQL equivalent

SELECT 100 * ( extract(epoch from (max(time) at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) / ( extract(epoch from (now() at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) AS sync_percent FROM block;

Returns:

Type Description
float

Sync progress percentage (0.0 to 100.0)

Example

from dbsync.session import get_session with get_session() as session: ... progress = ChainMetadataQueries.get_sync_progress_percent(session) ... print(f"Sync progress: {progress:.2f}%")

get_table_size_pretty(session, table_name='block') staticmethod

Get the human-readable size of a specific database table.

SQL equivalent

SELECT pg_size_pretty(pg_total_relation_size('block'));

Parameters:

Name Type Description Default
table_name str

Name of the table to check (default: 'block')

'block'

Returns:

Type Description
str

Human-readable table size (e.g., "2760 MB")

Example

from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_table_size_pretty(session, "tx_out") ... print(f"tx_out table size: {size}")

get_chain_info(session)

Get comprehensive chain information in a single call.

Returns a dictionary with all basic chain metadata including: - Chain metadata (network, start time) - Current supply in ADA and Lovelace - Latest slot number - Database size information - Sync progress information

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required

Returns:

Type Description
dict[str, Any]

Dictionary containing all chain information

Example

from dbsync.session import get_session with get_session() as session: ... info = get_chain_info(session) ... print(f"Network: {info['network']}") ... print(f"Supply: {info['supply_ada']:.2f} ADA") ... print(f"Latest slot: {info['latest_slot']}")

governance

Conway Era Governance Utilities Query Examples.

This example demonstrates how to use the dbsync-py package to implement Conway era governance queries for Cardano's on-chain governance features.

GovernanceQueries

Example Conway era governance query utilities.

get_committee_operations_tracking(session, committee_member=None, limit=20) staticmethod

Monitor constitutional committee activities and decisions.

get_drep_activity_monitoring(session, drep_id=None, limit=20) staticmethod

Track DRep registrations, delegations, and voting activity.

get_governance_proposal_analysis(session, proposal_id=None, limit=20) staticmethod

Analyze governance action proposals and their lifecycle.

get_treasury_governance_analysis(session, days=90, limit=20) staticmethod

Track treasury operations and governance spending.

get_voting_participation_metrics(session, days=30, limit=20) staticmethod

Analyze voting participation rates and outcomes.

get_comprehensive_governance_analysis(session, proposal_id=None, drep_id=None, committee_member=None, days=30)

Get comprehensive Conway era governance analysis in a single call.

multi_asset

Multi-Asset & Token Operations Query Examples.

This example demonstrates how to use the dbsync-py package to implement multi-asset and native token analysis queries.

MultiAssetQueries

Example multi-asset and token operation queries.

get_asset_metadata_tracking(session, policy_id=None, limit=20) staticmethod

Track native asset metadata and policy information.

get_token_portfolio_analysis(session, address=None, limit=50) staticmethod

Analyze token holdings and distributions for addresses or network-wide.

get_token_transfer_patterns(session, days=30, limit=20) staticmethod

Monitor token transfer activity and volume patterns.

get_comprehensive_multi_asset_analysis(session, policy_id=None, days=30)

Get comprehensive multi-asset analysis in a single call.

pool_management

Pool Management & Block Production Queries - Example Implementation.

This example demonstrates how to use the dbsync-py package to implement pool management and block production analysis queries.

PoolManagementQueries

Example pool management and block production queries.

get_pool_block_production_stats(session, pool_id, epochs=10) staticmethod

Get block production statistics for a pool over recent epochs.

get_pool_delegation_summary(session, pool_id, limit=100) staticmethod

Get current delegation summary for a pool.

get_pool_operational_status(session, pool_id) staticmethod

Get current operational status and configuration for a pool.

get_pool_performance_metrics(session, pool_id, epoch_no=None) staticmethod

Get detailed performance metrics for a pool in a specific epoch.

get_pool_registration_info(session, pool_id) staticmethod

Get comprehensive pool registration and metadata information.

get_pool_rewards_analysis(session, pool_id, epochs=5) staticmethod

Get reward distribution analysis for a pool over recent epochs.

get_comprehensive_pool_analysis(session, pool_id, epochs=5)

Get comprehensive pool analysis in a single call.

smart_contracts

Smart Contracts & Scripts Query Examples.

This example demonstrates how to use the dbsync-py package to implement smart contract and script analysis queries.

SmartContractsQueries

Example smart contracts and scripts queries.

get_contract_usage_patterns(session, days=30, limit=20) staticmethod

Track smart contract execution patterns over time.

get_contract_value_tracking(session, epoch_no=None, limit=20) staticmethod

Track value locked in smart contracts.

get_script_analysis(session, script_hash=None, limit=50) staticmethod

Analyze native scripts and Plutus scripts usage.

get_script_hash_tracking(session, script_hash) staticmethod

Monitor specific script hash usage and redeemer details.

get_comprehensive_smart_contract_analysis(session, script_hash=None, days=30)

Get comprehensive smart contract analysis in a single call.

staking_delegation

Staking & Delegation Patterns Query Examples.

This example demonstrates how to use the dbsync-py package to implement staking and delegation analysis queries.

StakingDelegationQueries

Example staking and delegation pattern queries.

get_active_stake_monitoring(session, epoch_no=None) staticmethod

Monitor current active stake status and metrics.

get_delegation_history(session, stake_address, limit=50) staticmethod

Get delegation history for a specific stake address.

get_delegation_lifecycle(session, stake_address) staticmethod

Track complete delegation lifecycle for a stake address.

get_reward_earning_patterns(session, stake_address, epochs=10) staticmethod

Analyze reward earning patterns for a stake address.

get_stake_distribution_patterns(session, epoch_no=None, limit=20) staticmethod

Analyze stake distribution patterns across pools.

get_comprehensive_staking_analysis(session, stake_address, epochs=5)

Get comprehensive staking analysis in a single call.

transaction_analysis

Transaction Analysis Queries - Example Implementation.

This example demonstrates how to use the dbsync-py package to implement transaction analysis and UTxO operation queries.

This file converts common transaction analysis patterns to SQLAlchemy implementations using the dbsync-py models.

Based on common patterns for Cardano transaction analysis and UTxO operations.

Usage

from dbsync.examples.queries.transaction_analysis import TransactionAnalysisQueries from dbsync.session import get_session

with get_session() as session: fee_stats = TransactionAnalysisQueries.get_transaction_fee_stats(session) print(f"Average fee: {fee_stats['avg_fee'] / 1_000_000:.2f} ADA")

TransactionAnalysisQueries

Example transaction analysis and UTxO operation queries.

This class demonstrates SQLAlchemy implementations of common transaction analysis patterns, including fee analysis, UTxO calculations, and address transaction history.

These are example implementations showing how to use the dbsync-py package models to build useful transaction analysis queries.

get_address_balance(session, address) staticmethod

Calculate current balance for a given address using UTxO method.

This calculates the balance by finding all unspent transaction outputs (UTxOs) for the address and summing their values.

SQL equivalent

SELECT addr.address, COUNT(utxo.id) as utxo_count, SUM(utxo.value) as total_balance FROM address addr INNER JOIN tx_out utxo ON addr.id = utxo.address_id LEFT JOIN tx_in spent ON utxo.id = spent.tx_out_id AND utxo.index = spent.tx_out_index WHERE addr.address = %s AND spent.id IS NULL GROUP BY addr.address;

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
address str

Cardano address to check

required

Returns:

Type Description
dict[str, Any]

Dictionary with balance information

Example

with get_session() as session: ... balance = TransactionAnalysisQueries.get_address_balance( ... session, "addr1..." ... ) ... print(f"Balance: {balance['balance_ada']:.2f} ADA")

get_address_transaction_history(session, address, limit=10) staticmethod

Get recent transaction history for an address.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
address str

Address to analyze

required
limit int

Maximum number of transactions to return

10

Returns:

Type Description
dict[str, Any]

Dictionary with transaction history

Example

with get_session() as session: ... history = TransactionAnalysisQueries.get_address_transaction_history( ... session, "addr1...", limit=5 ... ) ... print(f"Recent transactions: {len(history['transactions'])}")

get_hourly_transaction_throughput(session, hours=24) staticmethod

Get transaction throughput statistics by hour.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
hours int

Number of hours to analyze

24

Returns:

Type Description
dict[str, Any]

Dictionary with hourly throughput data

Example

with get_session() as session: ... throughput = TransactionAnalysisQueries.get_hourly_transaction_throughput(session) ... print(f"Peak hour: {throughput['peak_hour_transactions']} transactions")

get_large_transactions(session, min_ada=1000.0, limit=10) staticmethod

Get transactions with large ADA amounts.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
min_ada float

Minimum ADA amount to consider "large"

1000.0
limit int

Maximum number of transactions to return

10

Returns:

Type Description
dict[str, Any]

Dictionary with large transaction data

Example

with get_session() as session: ... large_txs = TransactionAnalysisQueries.get_large_transactions( ... session, min_ada=10000.0 ... ) ... print(f"Found {len(large_txs['transactions'])} large transactions")

get_transaction_fee_stats(session, days=7) staticmethod

Get transaction fee statistics for the specified time period.

SQL equivalent

SELECT COUNT(*) as tx_count, AVG(fee) as avg_fee, MIN(fee) as min_fee, MAX(fee) as max_fee, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY fee) as median_fee, SUM(fee) as total_fees FROM tx INNER JOIN block ON tx.block_id = block.id WHERE block.time > NOW() - INTERVAL '%s days';

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
days int

Number of days to analyze (default: 7)

7

Returns:

Type Description
dict[str, Any]

Dictionary with fee statistics

Example

from dbsync.session import get_session with get_session() as session: ... stats = TransactionAnalysisQueries.get_transaction_fee_stats(session) ... print(f"Average fee: {stats['avg_fee'] / 1_000_000:.2f} ADA")

get_transaction_inputs_outputs(session, tx_hash) staticmethod

Get detailed input and output information for a transaction.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
tx_hash str

Transaction hash to analyze

required

Returns:

Type Description
dict[str, Any]

Dictionary with input and output details

Example

with get_session() as session: ... details = TransactionAnalysisQueries.get_transaction_inputs_outputs( ... session, "abc123..." ... ) ... print(f"Inputs: {len(details['inputs'])}, Outputs: {len(details['outputs'])}")

get_transaction_size_distribution(session, days=7) staticmethod

Get transaction size distribution statistics.

This analyzes transaction sizes by counting inputs and outputs.

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
days int

Number of days to analyze

7

Returns:

Type Description
dict[str, Any]

Dictionary with size distribution data

Example

with get_session() as session: ... distribution = TransactionAnalysisQueries.get_transaction_size_distribution(session) ... print(f"Average inputs: {distribution['avg_inputs']:.2f}")

get_comprehensive_transaction_analysis(session, days=7)

Get comprehensive transaction analysis in a single call.

Returns a dictionary with various transaction analysis metrics including: - Fee statistics - Transaction throughput - Size distribution - Large transaction analysis

Parameters:

Name Type Description Default
session Session | AsyncSession

Database session (sync or async)

required
days int

Number of days to analyze

7

Returns:

Type Description
dict[str, Any]

Dictionary containing comprehensive analysis

Example

from dbsync.session import get_session with get_session() as session: ... analysis = get_comprehensive_transaction_analysis(session) ... print(f"Total transactions: {analysis['fee_stats']['tx_count']:,}") ... print(f"Average fee: {analysis['fee_stats']['avg_fee'] / 1_000_000:.4f} ADA")

Common Query Patterns

Basic Entity Retrieval

from dbsync.session import create_session
from dbsync.models import Block, Transaction, StakeAddress

# Get a session
session = create_session()

# Basic entity retrieval by ID
block = session.get(Block, block_id)
tx = session.get(Transaction, tx_id)

# Query by unique fields
latest_block = session.query(Block).order_by(Block.block_no.desc()).first()
tx_by_hash = session.query(Transaction).filter(Transaction.hash == tx_hash).first()

Relationship Navigation

# Navigate relationships using SQLAlchemy patterns
block = session.get(Block, block_id)

# Get all transactions in a block
transactions = block.transactions

# Get the slot leader who produced the block
slot_leader = block.slot_leader

# Get the epoch this block belongs to
epoch = block.epoch

Complex Queries with Joins

from sqlalchemy import and_, or_, func
from dbsync.models import (
    Block, Transaction, TransactionOutput,
    StakeAddress, Delegation, Reward
)

# Find all transactions in the latest 100 blocks
latest_txs = session.query(Transaction).join(Block).filter(
    Block.block_no >= session.query(func.max(Block.block_no)).scalar() - 100
).all()

# Get stake delegation history for an address
delegations = session.query(Delegation).join(StakeAddress).filter(
    StakeAddress.view == stake_address_bech32
).order_by(Delegation.active_epoch_no.desc()).all()

Aggregation Queries

# Calculate total ADA in circulation
total_supply = session.query(func.sum(TransactionOutput.value)).scalar()

# Count transactions per epoch
tx_counts = session.query(
    Block.epoch_no,
    func.count(Transaction.id_).label('tx_count')
).join(Transaction).group_by(Block.epoch_no).all()

# Find top stake pools by delegation
top_pools = session.query(
    Delegation.pool_hash_id,
    func.count(Delegation.id_).label('delegator_count')
).group_by(Delegation.pool_hash_id).order_by(
    func.count(Delegation.id_).desc()
).limit(10).all()

Time-Based Queries

from datetime import datetime, timedelta

# Get blocks from the last 24 hours
yesterday = datetime.utcnow() - timedelta(days=1)
recent_blocks = session.query(Block).filter(
    Block.time >= yesterday
).order_by(Block.time.desc()).all()

# Get transactions in a specific epoch
epoch_txs = session.query(Transaction).join(Block).filter(
    Block.epoch_no == target_epoch
).all()

Asset and Multi-Asset Queries

from dbsync.models import MultiAsset, MaTxOut, MaTxMint

# Find all native tokens for a policy
policy_assets = session.query(MultiAsset).filter(
    MultiAsset.policy == policy_hex
).all()

# Get asset transaction history
asset_history = session.query(MaTxOut).join(MultiAsset).filter(
    and_(
        MultiAsset.policy == policy_hex,
        MultiAsset.name == asset_name_hex
    )
).all()

Governance Queries

from dbsync.models import GovActionProposal, VotingProcedure, DrepRegistration

# Get active governance proposals
active_proposals = session.query(GovActionProposal).filter(
    and_(
        GovActionProposal.ratified_epoch.is_(None),
        GovActionProposal.expired_epoch.is_(None),
        GovActionProposal.dropped_epoch.is_(None)
    )
).all()

# Get voting history for a DRep
drep_votes = session.query(VotingProcedure).join(DrepRegistration).filter(
    DrepRegistration.drep_hash_id == drep_id
).all()

Performance Optimization

Index Usage

# Use indexed fields for filtering
# Good: Use indexed hash fields
tx = session.query(Transaction).filter(Transaction.hash == tx_hash).first()

# Good: Use indexed foreign keys
block_txs = session.query(Transaction).filter(Transaction.block_id == block_id).all()

# Less optimal: Non-indexed field searches
# Consider adding custom indices for frequently used patterns

Pagination

from sqlalchemy import desc

# Implement pagination for large result sets
def get_transactions_page(session, page=1, page_size=100):
    offset = (page - 1) * page_size
    return session.query(Transaction).order_by(
        desc(Transaction.id_)
    ).offset(offset).limit(page_size).all()

# Get total count for pagination metadata
total_count = session.query(Transaction).count()

Lazy Loading vs Eager Loading

from sqlalchemy.orm import joinedload, selectinload

# Eager load related objects to avoid N+1 queries
blocks_with_txs = session.query(Block).options(
    selectinload(Block.transactions)
).all()

# Use joinedload for single relationships
txs_with_blocks = session.query(Transaction).options(
    joinedload(Transaction.block)
).all()

Bulk Operations

# Bulk insert for large datasets (read-only typically)
# This is mainly for reference as dbsync-py is read-only

# Bulk updates (if needed for custom applications)
session.query(CustomModel).filter(
    CustomModel.status == 'pending'
).update({CustomModel.status: 'processed'})

Query Builder Helpers

Custom Query Functions

def get_address_utxos(session, address_id: int):
    """Get all UTXOs for an address."""
    from dbsync.models import TransactionOutput, TransactionInput

    # Get all outputs to this address
    outputs = session.query(TransactionOutput).filter(
        TransactionOutput.address_id == address_id
    )

    # Exclude spent outputs
    spent_output_ids = session.query(TransactionInput.tx_out_id).distinct()

    return outputs.filter(
        ~TransactionOutput.id_.in_(spent_output_ids)
    ).all()

def get_pool_performance(session, pool_hash_id: int, epoch_range: tuple):
    """Calculate pool performance metrics."""
    from dbsync.models import Block, PoolHash

    start_epoch, end_epoch = epoch_range

    blocks_produced = session.query(func.count(Block.id_)).join(
        SlotLeader
    ).filter(
        and_(
            SlotLeader.pool_hash_id == pool_hash_id,
            Block.epoch_no.between(start_epoch, end_epoch)
        )
    ).scalar()

    return {
        'blocks_produced': blocks_produced,
        'epoch_range': epoch_range
    }

Error Handling

from sqlalchemy.exc import NoResultFound, MultipleResultsFound

def safe_get_transaction(session, tx_hash: str):
    """Safely get a transaction by hash with proper error handling."""
    try:
        return session.query(Transaction).filter(
            Transaction.hash == bytes.fromhex(tx_hash)
        ).one()
    except NoResultFound:
        return None
    except MultipleResultsFound:
        # This should not happen with proper database constraints
        raise ValueError(f"Multiple transactions found for hash: {tx_hash}")

Query Performance Guidelines

Best Practices

  1. Use Indexes: Always filter on indexed columns when possible
  2. Limit Results: Use .limit() for large result sets
  3. Eager Loading: Use joinedload() or selectinload() to avoid N+1 queries
  4. Specific Columns: Select only needed columns for large datasets
  5. Connection Pooling: Reuse sessions appropriately

Common Anti-Patterns

# Anti-pattern: Loading all data then filtering in Python
all_txs = session.query(Transaction).all()
filtered = [tx for tx in all_txs if tx.fee > 1000000]  # Don't do this

# Better: Filter in database
high_fee_txs = session.query(Transaction).filter(
    Transaction.fee > 1000000
).all()

# Anti-pattern: N+1 queries
for tx in transactions:
    block = tx.block  # This hits the database for each transaction

# Better: Eager loading
transactions = session.query(Transaction).options(
    joinedload(Transaction.block)
).all()