Query API Reference¶
This section documents query utilities, common patterns, and helper functions for working with dbsync-py models.
Query Utilities¶
dbsync.examples.queries
¶
Query pattern examples for dbsync-py.
This package contains example implementations of common query patterns using the dbsync-py package models. These examples demonstrate how to convert SQL queries from the Cardano DB Sync documentation into SQLAlchemy implementations.
Available query examples: - chain_metadata: Chain fundamentals and metadata queries - transaction_analysis: Transaction analysis and UTxO operations - pool_management: Pool management and block production - staking_delegation: Staking and delegation pattern queries - smart_contracts: Smart contracts and scripts analysis - governance: Conway era governance utilities - multi_asset: Multi-asset and token operations
These are educational examples showing best practices for: - Using dbsync-py models - Building SQLAlchemy queries - Handling edge cases and errors - Creating reusable query patterns
ChainMetadataQueries
¶
Example chain metadata and fundamental blockchain data queries.
This class demonstrates SQLAlchemy implementations of common chain metadata queries, including supply calculations, sync progress, and basic chain info.
These are example implementations showing how to use the dbsync-py package models to build useful queries.
get_chain_metadata(session)
staticmethod
¶
Get chain metadata information.
SQL equivalent
SELECT * FROM meta;
Returns:
| Type | Description |
|---|---|
ChainMeta | None
|
ChainMeta object with network information, or None if not found |
Example
from dbsync.session import get_session with get_session() as session: ... meta = ChainMetadataQueries.get_chain_metadata(session) ... print(f"Network: {meta.network_name}") ... print(f"Start time: {meta.start_time}")
get_current_supply(session)
staticmethod
¶
Calculate the current total on-chain supply of Ada.
Note: 1 ADA == 1,000,000 Lovelace
This queries the UTxO set for unspent transaction outputs. It does not include staking rewards that have not yet been withdrawn. Before being withdrawn, rewards exist in ledger state and not on-chain.
SQL equivalent
SELECT sum(value) FROM tx_out AS tx_outer WHERE NOT EXISTS ( SELECT tx_out.id FROM tx_out INNER JOIN tx_in ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index WHERE tx_outer.id = tx_out.id );
Returns:
| Type | Description |
|---|---|
int
|
Total supply in Lovelace |
Example
from dbsync.session import get_session with get_session() as session: ... supply = ChainMetadataQueries.get_current_supply(session) ... print(f"Current supply: {supply / 1_000_000:.2f} ADA")
get_database_size_pretty(session)
staticmethod
¶
Get the human-readable size of the database.
SQL equivalent
SELECT pg_size_pretty(pg_database_size(current_database()));
Returns:
| Type | Description |
|---|---|
str
|
Human-readable database size (e.g., "116 GB") |
Example
from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_database_size_pretty(session) ... print(f"Database size: {size}")
get_latest_slot_number(session)
staticmethod
¶
Get the slot number of the most recent block.
SQL equivalent
SELECT slot_no FROM block WHERE block_no IS NOT NULL ORDER BY block_no DESC LIMIT 1;
Returns:
| Type | Description |
|---|---|
int | None
|
Latest slot number, or None if no blocks found |
Example
from dbsync.session import get_session with get_session() as session: ... slot = ChainMetadataQueries.get_latest_slot_number(session) ... print(f"Latest slot: {slot}")
get_sync_behind_duration(session)
staticmethod
¶
Get how far behind the sync is from current time.
SQL equivalent
SELECT now() - max(time) AS behind_by FROM block;
Returns:
| Type | Description |
|---|---|
str | None
|
Time duration string (e.g., "4 days 20:59:39.134497") or None |
Example
from dbsync.session import get_session with get_session() as session: ... behind = ChainMetadataQueries.get_sync_behind_duration(session) ... print(f"Sync is behind by: {behind}")
get_sync_progress_percent(session)
staticmethod
¶
Get rough estimate of sync progress as a percentage.
To get a rough estimate of how close to fully synced the database is, we use the timestamps on the blocks.
Note: This value can be misleading as it operates on block timestamps and early epochs contain much less data (e.g., Byron era did not have staking) and much fewer transactions.
SQL equivalent
SELECT 100 * ( extract(epoch from (max(time) at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) / ( extract(epoch from (now() at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) AS sync_percent FROM block;
Returns:
| Type | Description |
|---|---|
float
|
Sync progress percentage (0.0 to 100.0) |
Example
from dbsync.session import get_session with get_session() as session: ... progress = ChainMetadataQueries.get_sync_progress_percent(session) ... print(f"Sync progress: {progress:.2f}%")
get_table_size_pretty(session, table_name='block')
staticmethod
¶
Get the human-readable size of a specific database table.
SQL equivalent
SELECT pg_size_pretty(pg_total_relation_size('block'));
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to check (default: 'block') |
'block'
|
Returns:
| Type | Description |
|---|---|
str
|
Human-readable table size (e.g., "2760 MB") |
Example
from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_table_size_pretty(session, "tx_out") ... print(f"tx_out table size: {size}")
GovernanceQueries
¶
Example Conway era governance query utilities.
get_committee_operations_tracking(session, committee_member=None, limit=20)
staticmethod
¶
Monitor constitutional committee activities and decisions.
get_drep_activity_monitoring(session, drep_id=None, limit=20)
staticmethod
¶
Track DRep registrations, delegations, and voting activity.
get_governance_proposal_analysis(session, proposal_id=None, limit=20)
staticmethod
¶
Analyze governance action proposals and their lifecycle.
get_treasury_governance_analysis(session, days=90, limit=20)
staticmethod
¶
Track treasury operations and governance spending.
get_voting_participation_metrics(session, days=30, limit=20)
staticmethod
¶
Analyze voting participation rates and outcomes.
MultiAssetQueries
¶
Example multi-asset and token operation queries.
get_asset_metadata_tracking(session, policy_id=None, limit=20)
staticmethod
¶
Track native asset metadata and policy information.
get_token_portfolio_analysis(session, address=None, limit=50)
staticmethod
¶
Analyze token holdings and distributions for addresses or network-wide.
get_token_transfer_patterns(session, days=30, limit=20)
staticmethod
¶
Monitor token transfer activity and volume patterns.
PoolManagementQueries
¶
Example pool management and block production queries.
get_pool_block_production_stats(session, pool_id, epochs=10)
staticmethod
¶
Get block production statistics for a pool over recent epochs.
get_pool_delegation_summary(session, pool_id, limit=100)
staticmethod
¶
Get current delegation summary for a pool.
get_pool_operational_status(session, pool_id)
staticmethod
¶
Get current operational status and configuration for a pool.
get_pool_performance_metrics(session, pool_id, epoch_no=None)
staticmethod
¶
Get detailed performance metrics for a pool in a specific epoch.
get_pool_registration_info(session, pool_id)
staticmethod
¶
Get comprehensive pool registration and metadata information.
get_pool_rewards_analysis(session, pool_id, epochs=5)
staticmethod
¶
Get reward distribution analysis for a pool over recent epochs.
SmartContractsQueries
¶
Example smart contracts and scripts queries.
get_contract_usage_patterns(session, days=30, limit=20)
staticmethod
¶
Track smart contract execution patterns over time.
get_contract_value_tracking(session, epoch_no=None, limit=20)
staticmethod
¶
Track value locked in smart contracts.
get_script_analysis(session, script_hash=None, limit=50)
staticmethod
¶
Analyze native scripts and Plutus scripts usage.
get_script_hash_tracking(session, script_hash)
staticmethod
¶
Monitor specific script hash usage and redeemer details.
StakingDelegationQueries
¶
Example staking and delegation pattern queries.
get_active_stake_monitoring(session, epoch_no=None)
staticmethod
¶
Monitor current active stake status and metrics.
get_delegation_history(session, stake_address, limit=50)
staticmethod
¶
Get delegation history for a specific stake address.
get_delegation_lifecycle(session, stake_address)
staticmethod
¶
Track complete delegation lifecycle for a stake address.
get_reward_earning_patterns(session, stake_address, epochs=10)
staticmethod
¶
Analyze reward earning patterns for a stake address.
get_stake_distribution_patterns(session, epoch_no=None, limit=20)
staticmethod
¶
Analyze stake distribution patterns across pools.
TransactionAnalysisQueries
¶
Example transaction analysis and UTxO operation queries.
This class demonstrates SQLAlchemy implementations of common transaction analysis patterns, including fee analysis, UTxO calculations, and address transaction history.
These are example implementations showing how to use the dbsync-py package models to build useful transaction analysis queries.
get_address_balance(session, address)
staticmethod
¶
Calculate current balance for a given address using UTxO method.
This calculates the balance by finding all unspent transaction outputs (UTxOs) for the address and summing their values.
SQL equivalent
SELECT addr.address, COUNT(utxo.id) as utxo_count, SUM(utxo.value) as total_balance FROM address addr INNER JOIN tx_out utxo ON addr.id = utxo.address_id LEFT JOIN tx_in spent ON utxo.id = spent.tx_out_id AND utxo.index = spent.tx_out_index WHERE addr.address = %s AND spent.id IS NULL GROUP BY addr.address;
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
address
|
str
|
Cardano address to check |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with balance information |
Example
with get_session() as session: ... balance = TransactionAnalysisQueries.get_address_balance( ... session, "addr1..." ... ) ... print(f"Balance: {balance['balance_ada']:.2f} ADA")
get_address_transaction_history(session, address, limit=10)
staticmethod
¶
Get recent transaction history for an address.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
address
|
str
|
Address to analyze |
required |
limit
|
int
|
Maximum number of transactions to return |
10
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with transaction history |
Example
with get_session() as session: ... history = TransactionAnalysisQueries.get_address_transaction_history( ... session, "addr1...", limit=5 ... ) ... print(f"Recent transactions: {len(history['transactions'])}")
get_hourly_transaction_throughput(session, hours=24)
staticmethod
¶
Get transaction throughput statistics by hour.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
hours
|
int
|
Number of hours to analyze |
24
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with hourly throughput data |
Example
with get_session() as session: ... throughput = TransactionAnalysisQueries.get_hourly_transaction_throughput(session) ... print(f"Peak hour: {throughput['peak_hour_transactions']} transactions")
get_large_transactions(session, min_ada=1000.0, limit=10)
staticmethod
¶
Get transactions with large ADA amounts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
min_ada
|
float
|
Minimum ADA amount to consider "large" |
1000.0
|
limit
|
int
|
Maximum number of transactions to return |
10
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with large transaction data |
Example
with get_session() as session: ... large_txs = TransactionAnalysisQueries.get_large_transactions( ... session, min_ada=10000.0 ... ) ... print(f"Found {len(large_txs['transactions'])} large transactions")
get_transaction_fee_stats(session, days=7)
staticmethod
¶
Get transaction fee statistics for the specified time period.
SQL equivalent
SELECT COUNT(*) as tx_count, AVG(fee) as avg_fee, MIN(fee) as min_fee, MAX(fee) as max_fee, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY fee) as median_fee, SUM(fee) as total_fees FROM tx INNER JOIN block ON tx.block_id = block.id WHERE block.time > NOW() - INTERVAL '%s days';
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
days
|
int
|
Number of days to analyze (default: 7) |
7
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with fee statistics |
Example
from dbsync.session import get_session with get_session() as session: ... stats = TransactionAnalysisQueries.get_transaction_fee_stats(session) ... print(f"Average fee: {stats['avg_fee'] / 1_000_000:.2f} ADA")
get_transaction_inputs_outputs(session, tx_hash)
staticmethod
¶
Get detailed input and output information for a transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
tx_hash
|
str
|
Transaction hash to analyze |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with input and output details |
Example
with get_session() as session: ... details = TransactionAnalysisQueries.get_transaction_inputs_outputs( ... session, "abc123..." ... ) ... print(f"Inputs: {len(details['inputs'])}, Outputs: {len(details['outputs'])}")
get_transaction_size_distribution(session, days=7)
staticmethod
¶
Get transaction size distribution statistics.
This analyzes transaction sizes by counting inputs and outputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
days
|
int
|
Number of days to analyze |
7
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with size distribution data |
Example
with get_session() as session: ... distribution = TransactionAnalysisQueries.get_transaction_size_distribution(session) ... print(f"Average inputs: {distribution['avg_inputs']:.2f}")
get_chain_info(session)
¶
Get comprehensive chain information in a single call.
Returns a dictionary with all basic chain metadata including: - Chain metadata (network, start time) - Current supply in ADA and Lovelace - Latest slot number - Database size information - Sync progress information
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing all chain information |
Example
from dbsync.session import get_session with get_session() as session: ... info = get_chain_info(session) ... print(f"Network: {info['network']}") ... print(f"Supply: {info['supply_ada']:.2f} ADA") ... print(f"Latest slot: {info['latest_slot']}")
get_comprehensive_governance_analysis(session, proposal_id=None, drep_id=None, committee_member=None, days=30)
¶
Get comprehensive Conway era governance analysis in a single call.
get_comprehensive_multi_asset_analysis(session, policy_id=None, days=30)
¶
Get comprehensive multi-asset analysis in a single call.
get_comprehensive_pool_analysis(session, pool_id, epochs=5)
¶
Get comprehensive pool analysis in a single call.
get_comprehensive_smart_contract_analysis(session, script_hash=None, days=30)
¶
Get comprehensive smart contract analysis in a single call.
get_comprehensive_staking_analysis(session, stake_address, epochs=5)
¶
Get comprehensive staking analysis in a single call.
get_comprehensive_transaction_analysis(session, days=7)
¶
Get comprehensive transaction analysis in a single call.
Returns a dictionary with various transaction analysis metrics including: - Fee statistics - Transaction throughput - Size distribution - Large transaction analysis
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
days
|
int
|
Number of days to analyze |
7
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing comprehensive analysis |
Example
from dbsync.session import get_session with get_session() as session: ... analysis = get_comprehensive_transaction_analysis(session) ... print(f"Total transactions: {analysis['fee_stats']['tx_count']:,}") ... print(f"Average fee: {analysis['fee_stats']['avg_fee'] / 1_000_000:.4f} ADA")
chain_metadata
¶
Chain Metadata Queries - Example Implementation.
This example demonstrates how to use the dbsync-py package to implement common chain metadata and fundamental blockchain data queries.
This file converts SQL examples from the Cardano DB Sync documentation to SQLAlchemy implementations using the dbsync-py models.
Based on SQL examples from: https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/interesting-queries.md
Usage
from dbsync.examples.queries.chain_metadata import ChainMetadataQueries, get_chain_info from dbsync.session import get_session
with get_session() as session: info = get_chain_info(session) print(f"Current supply: {info['supply_ada']:.2f} ADA")
ChainMetadataQueries
¶
Example chain metadata and fundamental blockchain data queries.
This class demonstrates SQLAlchemy implementations of common chain metadata queries, including supply calculations, sync progress, and basic chain info.
These are example implementations showing how to use the dbsync-py package models to build useful queries.
get_chain_metadata(session)
staticmethod
¶
Get chain metadata information.
SQL equivalent
SELECT * FROM meta;
Returns:
| Type | Description |
|---|---|
ChainMeta | None
|
ChainMeta object with network information, or None if not found |
Example
from dbsync.session import get_session with get_session() as session: ... meta = ChainMetadataQueries.get_chain_metadata(session) ... print(f"Network: {meta.network_name}") ... print(f"Start time: {meta.start_time}")
get_current_supply(session)
staticmethod
¶
Calculate the current total on-chain supply of Ada.
Note: 1 ADA == 1,000,000 Lovelace
This queries the UTxO set for unspent transaction outputs. It does not include staking rewards that have not yet been withdrawn. Before being withdrawn, rewards exist in ledger state and not on-chain.
SQL equivalent
SELECT sum(value) FROM tx_out AS tx_outer WHERE NOT EXISTS ( SELECT tx_out.id FROM tx_out INNER JOIN tx_in ON tx_out.tx_id = tx_in.tx_out_id AND tx_out.index = tx_in.tx_out_index WHERE tx_outer.id = tx_out.id );
Returns:
| Type | Description |
|---|---|
int
|
Total supply in Lovelace |
Example
from dbsync.session import get_session with get_session() as session: ... supply = ChainMetadataQueries.get_current_supply(session) ... print(f"Current supply: {supply / 1_000_000:.2f} ADA")
get_database_size_pretty(session)
staticmethod
¶
Get the human-readable size of the database.
SQL equivalent
SELECT pg_size_pretty(pg_database_size(current_database()));
Returns:
| Type | Description |
|---|---|
str
|
Human-readable database size (e.g., "116 GB") |
Example
from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_database_size_pretty(session) ... print(f"Database size: {size}")
get_latest_slot_number(session)
staticmethod
¶
Get the slot number of the most recent block.
SQL equivalent
SELECT slot_no FROM block WHERE block_no IS NOT NULL ORDER BY block_no DESC LIMIT 1;
Returns:
| Type | Description |
|---|---|
int | None
|
Latest slot number, or None if no blocks found |
Example
from dbsync.session import get_session with get_session() as session: ... slot = ChainMetadataQueries.get_latest_slot_number(session) ... print(f"Latest slot: {slot}")
get_sync_behind_duration(session)
staticmethod
¶
Get how far behind the sync is from current time.
SQL equivalent
SELECT now() - max(time) AS behind_by FROM block;
Returns:
| Type | Description |
|---|---|
str | None
|
Time duration string (e.g., "4 days 20:59:39.134497") or None |
Example
from dbsync.session import get_session with get_session() as session: ... behind = ChainMetadataQueries.get_sync_behind_duration(session) ... print(f"Sync is behind by: {behind}")
get_sync_progress_percent(session)
staticmethod
¶
Get rough estimate of sync progress as a percentage.
To get a rough estimate of how close to fully synced the database is, we use the timestamps on the blocks.
Note: This value can be misleading as it operates on block timestamps and early epochs contain much less data (e.g., Byron era did not have staking) and much fewer transactions.
SQL equivalent
SELECT 100 * ( extract(epoch from (max(time) at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) / ( extract(epoch from (now() at time zone 'UTC')) - extract(epoch from (min(time) at time zone 'UTC')) ) AS sync_percent FROM block;
Returns:
| Type | Description |
|---|---|
float
|
Sync progress percentage (0.0 to 100.0) |
Example
from dbsync.session import get_session with get_session() as session: ... progress = ChainMetadataQueries.get_sync_progress_percent(session) ... print(f"Sync progress: {progress:.2f}%")
get_table_size_pretty(session, table_name='block')
staticmethod
¶
Get the human-readable size of a specific database table.
SQL equivalent
SELECT pg_size_pretty(pg_total_relation_size('block'));
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to check (default: 'block') |
'block'
|
Returns:
| Type | Description |
|---|---|
str
|
Human-readable table size (e.g., "2760 MB") |
Example
from dbsync.session import get_session with get_session() as session: ... size = ChainMetadataQueries.get_table_size_pretty(session, "tx_out") ... print(f"tx_out table size: {size}")
get_chain_info(session)
¶
Get comprehensive chain information in a single call.
Returns a dictionary with all basic chain metadata including: - Chain metadata (network, start time) - Current supply in ADA and Lovelace - Latest slot number - Database size information - Sync progress information
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing all chain information |
Example
from dbsync.session import get_session with get_session() as session: ... info = get_chain_info(session) ... print(f"Network: {info['network']}") ... print(f"Supply: {info['supply_ada']:.2f} ADA") ... print(f"Latest slot: {info['latest_slot']}")
governance
¶
Conway Era Governance Utilities Query Examples.
This example demonstrates how to use the dbsync-py package to implement Conway era governance queries for Cardano's on-chain governance features.
GovernanceQueries
¶
Example Conway era governance query utilities.
get_committee_operations_tracking(session, committee_member=None, limit=20)
staticmethod
¶
Monitor constitutional committee activities and decisions.
get_drep_activity_monitoring(session, drep_id=None, limit=20)
staticmethod
¶
Track DRep registrations, delegations, and voting activity.
get_governance_proposal_analysis(session, proposal_id=None, limit=20)
staticmethod
¶
Analyze governance action proposals and their lifecycle.
get_treasury_governance_analysis(session, days=90, limit=20)
staticmethod
¶
Track treasury operations and governance spending.
get_voting_participation_metrics(session, days=30, limit=20)
staticmethod
¶
Analyze voting participation rates and outcomes.
get_comprehensive_governance_analysis(session, proposal_id=None, drep_id=None, committee_member=None, days=30)
¶
Get comprehensive Conway era governance analysis in a single call.
multi_asset
¶
Multi-Asset & Token Operations Query Examples.
This example demonstrates how to use the dbsync-py package to implement multi-asset and native token analysis queries.
MultiAssetQueries
¶
Example multi-asset and token operation queries.
get_asset_metadata_tracking(session, policy_id=None, limit=20)
staticmethod
¶
Track native asset metadata and policy information.
get_token_portfolio_analysis(session, address=None, limit=50)
staticmethod
¶
Analyze token holdings and distributions for addresses or network-wide.
get_token_transfer_patterns(session, days=30, limit=20)
staticmethod
¶
Monitor token transfer activity and volume patterns.
get_comprehensive_multi_asset_analysis(session, policy_id=None, days=30)
¶
Get comprehensive multi-asset analysis in a single call.
pool_management
¶
Pool Management & Block Production Queries - Example Implementation.
This example demonstrates how to use the dbsync-py package to implement pool management and block production analysis queries.
PoolManagementQueries
¶
Example pool management and block production queries.
get_pool_block_production_stats(session, pool_id, epochs=10)
staticmethod
¶
Get block production statistics for a pool over recent epochs.
get_pool_delegation_summary(session, pool_id, limit=100)
staticmethod
¶
Get current delegation summary for a pool.
get_pool_operational_status(session, pool_id)
staticmethod
¶
Get current operational status and configuration for a pool.
get_pool_performance_metrics(session, pool_id, epoch_no=None)
staticmethod
¶
Get detailed performance metrics for a pool in a specific epoch.
get_pool_registration_info(session, pool_id)
staticmethod
¶
Get comprehensive pool registration and metadata information.
get_pool_rewards_analysis(session, pool_id, epochs=5)
staticmethod
¶
Get reward distribution analysis for a pool over recent epochs.
get_comprehensive_pool_analysis(session, pool_id, epochs=5)
¶
Get comprehensive pool analysis in a single call.
smart_contracts
¶
Smart Contracts & Scripts Query Examples.
This example demonstrates how to use the dbsync-py package to implement smart contract and script analysis queries.
SmartContractsQueries
¶
Example smart contracts and scripts queries.
get_contract_usage_patterns(session, days=30, limit=20)
staticmethod
¶
Track smart contract execution patterns over time.
get_contract_value_tracking(session, epoch_no=None, limit=20)
staticmethod
¶
Track value locked in smart contracts.
get_script_analysis(session, script_hash=None, limit=50)
staticmethod
¶
Analyze native scripts and Plutus scripts usage.
get_script_hash_tracking(session, script_hash)
staticmethod
¶
Monitor specific script hash usage and redeemer details.
get_comprehensive_smart_contract_analysis(session, script_hash=None, days=30)
¶
Get comprehensive smart contract analysis in a single call.
staking_delegation
¶
Staking & Delegation Patterns Query Examples.
This example demonstrates how to use the dbsync-py package to implement staking and delegation analysis queries.
StakingDelegationQueries
¶
Example staking and delegation pattern queries.
get_active_stake_monitoring(session, epoch_no=None)
staticmethod
¶
Monitor current active stake status and metrics.
get_delegation_history(session, stake_address, limit=50)
staticmethod
¶
Get delegation history for a specific stake address.
get_delegation_lifecycle(session, stake_address)
staticmethod
¶
Track complete delegation lifecycle for a stake address.
get_reward_earning_patterns(session, stake_address, epochs=10)
staticmethod
¶
Analyze reward earning patterns for a stake address.
get_stake_distribution_patterns(session, epoch_no=None, limit=20)
staticmethod
¶
Analyze stake distribution patterns across pools.
get_comprehensive_staking_analysis(session, stake_address, epochs=5)
¶
Get comprehensive staking analysis in a single call.
transaction_analysis
¶
Transaction Analysis Queries - Example Implementation.
This example demonstrates how to use the dbsync-py package to implement transaction analysis and UTxO operation queries.
This file converts common transaction analysis patterns to SQLAlchemy implementations using the dbsync-py models.
Based on common patterns for Cardano transaction analysis and UTxO operations.
Usage
from dbsync.examples.queries.transaction_analysis import TransactionAnalysisQueries from dbsync.session import get_session
with get_session() as session: fee_stats = TransactionAnalysisQueries.get_transaction_fee_stats(session) print(f"Average fee: {fee_stats['avg_fee'] / 1_000_000:.2f} ADA")
TransactionAnalysisQueries
¶
Example transaction analysis and UTxO operation queries.
This class demonstrates SQLAlchemy implementations of common transaction analysis patterns, including fee analysis, UTxO calculations, and address transaction history.
These are example implementations showing how to use the dbsync-py package models to build useful transaction analysis queries.
get_address_balance(session, address)
staticmethod
¶
Calculate current balance for a given address using UTxO method.
This calculates the balance by finding all unspent transaction outputs (UTxOs) for the address and summing their values.
SQL equivalent
SELECT addr.address, COUNT(utxo.id) as utxo_count, SUM(utxo.value) as total_balance FROM address addr INNER JOIN tx_out utxo ON addr.id = utxo.address_id LEFT JOIN tx_in spent ON utxo.id = spent.tx_out_id AND utxo.index = spent.tx_out_index WHERE addr.address = %s AND spent.id IS NULL GROUP BY addr.address;
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
address
|
str
|
Cardano address to check |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with balance information |
Example
with get_session() as session: ... balance = TransactionAnalysisQueries.get_address_balance( ... session, "addr1..." ... ) ... print(f"Balance: {balance['balance_ada']:.2f} ADA")
get_address_transaction_history(session, address, limit=10)
staticmethod
¶
Get recent transaction history for an address.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
address
|
str
|
Address to analyze |
required |
limit
|
int
|
Maximum number of transactions to return |
10
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with transaction history |
Example
with get_session() as session: ... history = TransactionAnalysisQueries.get_address_transaction_history( ... session, "addr1...", limit=5 ... ) ... print(f"Recent transactions: {len(history['transactions'])}")
get_hourly_transaction_throughput(session, hours=24)
staticmethod
¶
Get transaction throughput statistics by hour.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
hours
|
int
|
Number of hours to analyze |
24
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with hourly throughput data |
Example
with get_session() as session: ... throughput = TransactionAnalysisQueries.get_hourly_transaction_throughput(session) ... print(f"Peak hour: {throughput['peak_hour_transactions']} transactions")
get_large_transactions(session, min_ada=1000.0, limit=10)
staticmethod
¶
Get transactions with large ADA amounts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
min_ada
|
float
|
Minimum ADA amount to consider "large" |
1000.0
|
limit
|
int
|
Maximum number of transactions to return |
10
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with large transaction data |
Example
with get_session() as session: ... large_txs = TransactionAnalysisQueries.get_large_transactions( ... session, min_ada=10000.0 ... ) ... print(f"Found {len(large_txs['transactions'])} large transactions")
get_transaction_fee_stats(session, days=7)
staticmethod
¶
Get transaction fee statistics for the specified time period.
SQL equivalent
SELECT COUNT(*) as tx_count, AVG(fee) as avg_fee, MIN(fee) as min_fee, MAX(fee) as max_fee, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY fee) as median_fee, SUM(fee) as total_fees FROM tx INNER JOIN block ON tx.block_id = block.id WHERE block.time > NOW() - INTERVAL '%s days';
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
days
|
int
|
Number of days to analyze (default: 7) |
7
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with fee statistics |
Example
from dbsync.session import get_session with get_session() as session: ... stats = TransactionAnalysisQueries.get_transaction_fee_stats(session) ... print(f"Average fee: {stats['avg_fee'] / 1_000_000:.2f} ADA")
get_transaction_inputs_outputs(session, tx_hash)
staticmethod
¶
Get detailed input and output information for a transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
tx_hash
|
str
|
Transaction hash to analyze |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with input and output details |
Example
with get_session() as session: ... details = TransactionAnalysisQueries.get_transaction_inputs_outputs( ... session, "abc123..." ... ) ... print(f"Inputs: {len(details['inputs'])}, Outputs: {len(details['outputs'])}")
get_transaction_size_distribution(session, days=7)
staticmethod
¶
Get transaction size distribution statistics.
This analyzes transaction sizes by counting inputs and outputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
days
|
int
|
Number of days to analyze |
7
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with size distribution data |
Example
with get_session() as session: ... distribution = TransactionAnalysisQueries.get_transaction_size_distribution(session) ... print(f"Average inputs: {distribution['avg_inputs']:.2f}")
get_comprehensive_transaction_analysis(session, days=7)
¶
Get comprehensive transaction analysis in a single call.
Returns a dictionary with various transaction analysis metrics including: - Fee statistics - Transaction throughput - Size distribution - Large transaction analysis
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session
|
Session | AsyncSession
|
Database session (sync or async) |
required |
days
|
int
|
Number of days to analyze |
7
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing comprehensive analysis |
Example
from dbsync.session import get_session with get_session() as session: ... analysis = get_comprehensive_transaction_analysis(session) ... print(f"Total transactions: {analysis['fee_stats']['tx_count']:,}") ... print(f"Average fee: {analysis['fee_stats']['avg_fee'] / 1_000_000:.4f} ADA")
Common Query Patterns¶
Basic Entity Retrieval¶
from dbsync.session import create_session
from dbsync.models import Block, Transaction, StakeAddress
# Get a session
session = create_session()
# Basic entity retrieval by ID
block = session.get(Block, block_id)
tx = session.get(Transaction, tx_id)
# Query by unique fields
latest_block = session.query(Block).order_by(Block.block_no.desc()).first()
tx_by_hash = session.query(Transaction).filter(Transaction.hash == tx_hash).first()
Relationship Navigation¶
# Navigate relationships using SQLAlchemy patterns
block = session.get(Block, block_id)
# Get all transactions in a block
transactions = block.transactions
# Get the slot leader who produced the block
slot_leader = block.slot_leader
# Get the epoch this block belongs to
epoch = block.epoch
Complex Queries with Joins¶
from sqlalchemy import and_, or_, func
from dbsync.models import (
Block, Transaction, TransactionOutput,
StakeAddress, Delegation, Reward
)
# Find all transactions in the latest 100 blocks
latest_txs = session.query(Transaction).join(Block).filter(
Block.block_no >= session.query(func.max(Block.block_no)).scalar() - 100
).all()
# Get stake delegation history for an address
delegations = session.query(Delegation).join(StakeAddress).filter(
StakeAddress.view == stake_address_bech32
).order_by(Delegation.active_epoch_no.desc()).all()
Aggregation Queries¶
# Calculate total ADA in circulation
total_supply = session.query(func.sum(TransactionOutput.value)).scalar()
# Count transactions per epoch
tx_counts = session.query(
Block.epoch_no,
func.count(Transaction.id_).label('tx_count')
).join(Transaction).group_by(Block.epoch_no).all()
# Find top stake pools by delegation
top_pools = session.query(
Delegation.pool_hash_id,
func.count(Delegation.id_).label('delegator_count')
).group_by(Delegation.pool_hash_id).order_by(
func.count(Delegation.id_).desc()
).limit(10).all()
Time-Based Queries¶
from datetime import datetime, timedelta
# Get blocks from the last 24 hours
yesterday = datetime.utcnow() - timedelta(days=1)
recent_blocks = session.query(Block).filter(
Block.time >= yesterday
).order_by(Block.time.desc()).all()
# Get transactions in a specific epoch
epoch_txs = session.query(Transaction).join(Block).filter(
Block.epoch_no == target_epoch
).all()
Asset and Multi-Asset Queries¶
from dbsync.models import MultiAsset, MaTxOut, MaTxMint
# Find all native tokens for a policy
policy_assets = session.query(MultiAsset).filter(
MultiAsset.policy == policy_hex
).all()
# Get asset transaction history
asset_history = session.query(MaTxOut).join(MultiAsset).filter(
and_(
MultiAsset.policy == policy_hex,
MultiAsset.name == asset_name_hex
)
).all()
Governance Queries¶
from dbsync.models import GovActionProposal, VotingProcedure, DrepRegistration
# Get active governance proposals
active_proposals = session.query(GovActionProposal).filter(
and_(
GovActionProposal.ratified_epoch.is_(None),
GovActionProposal.expired_epoch.is_(None),
GovActionProposal.dropped_epoch.is_(None)
)
).all()
# Get voting history for a DRep
drep_votes = session.query(VotingProcedure).join(DrepRegistration).filter(
DrepRegistration.drep_hash_id == drep_id
).all()
Performance Optimization¶
Index Usage¶
# Use indexed fields for filtering
# Good: Use indexed hash fields
tx = session.query(Transaction).filter(Transaction.hash == tx_hash).first()
# Good: Use indexed foreign keys
block_txs = session.query(Transaction).filter(Transaction.block_id == block_id).all()
# Less optimal: Non-indexed field searches
# Consider adding custom indices for frequently used patterns
Pagination¶
from sqlalchemy import desc
# Implement pagination for large result sets
def get_transactions_page(session, page=1, page_size=100):
offset = (page - 1) * page_size
return session.query(Transaction).order_by(
desc(Transaction.id_)
).offset(offset).limit(page_size).all()
# Get total count for pagination metadata
total_count = session.query(Transaction).count()
Lazy Loading vs Eager Loading¶
from sqlalchemy.orm import joinedload, selectinload
# Eager load related objects to avoid N+1 queries
blocks_with_txs = session.query(Block).options(
selectinload(Block.transactions)
).all()
# Use joinedload for single relationships
txs_with_blocks = session.query(Transaction).options(
joinedload(Transaction.block)
).all()
Bulk Operations¶
# Bulk insert for large datasets (read-only typically)
# This is mainly for reference as dbsync-py is read-only
# Bulk updates (if needed for custom applications)
session.query(CustomModel).filter(
CustomModel.status == 'pending'
).update({CustomModel.status: 'processed'})
Query Builder Helpers¶
Custom Query Functions¶
def get_address_utxos(session, address_id: int):
"""Get all UTXOs for an address."""
from dbsync.models import TransactionOutput, TransactionInput
# Get all outputs to this address
outputs = session.query(TransactionOutput).filter(
TransactionOutput.address_id == address_id
)
# Exclude spent outputs
spent_output_ids = session.query(TransactionInput.tx_out_id).distinct()
return outputs.filter(
~TransactionOutput.id_.in_(spent_output_ids)
).all()
def get_pool_performance(session, pool_hash_id: int, epoch_range: tuple):
"""Calculate pool performance metrics."""
from dbsync.models import Block, PoolHash
start_epoch, end_epoch = epoch_range
blocks_produced = session.query(func.count(Block.id_)).join(
SlotLeader
).filter(
and_(
SlotLeader.pool_hash_id == pool_hash_id,
Block.epoch_no.between(start_epoch, end_epoch)
)
).scalar()
return {
'blocks_produced': blocks_produced,
'epoch_range': epoch_range
}
Error Handling¶
from sqlalchemy.exc import NoResultFound, MultipleResultsFound
def safe_get_transaction(session, tx_hash: str):
"""Safely get a transaction by hash with proper error handling."""
try:
return session.query(Transaction).filter(
Transaction.hash == bytes.fromhex(tx_hash)
).one()
except NoResultFound:
return None
except MultipleResultsFound:
# This should not happen with proper database constraints
raise ValueError(f"Multiple transactions found for hash: {tx_hash}")
Query Performance Guidelines¶
Best Practices¶
- Use Indexes: Always filter on indexed columns when possible
- Limit Results: Use
.limit()for large result sets - Eager Loading: Use
joinedload()orselectinload()to avoid N+1 queries - Specific Columns: Select only needed columns for large datasets
- Connection Pooling: Reuse sessions appropriately
Common Anti-Patterns¶
# Anti-pattern: Loading all data then filtering in Python
all_txs = session.query(Transaction).all()
filtered = [tx for tx in all_txs if tx.fee > 1000000] # Don't do this
# Better: Filter in database
high_fee_txs = session.query(Transaction).filter(
Transaction.fee > 1000000
).all()
# Anti-pattern: N+1 queries
for tx in transactions:
block = tx.block # This hits the database for each transaction
# Better: Eager loading
transactions = session.query(Transaction).options(
joinedload(Transaction.block)
).all()