Transaction Analysis Examples¶
This section provides comprehensive examples for analyzing Cardano transactions using dbsync-py.
Basic Transaction Analysis¶
Transaction Details and Structure¶
from dbsync.session import create_session
from dbsync.models import (
Transaction, TransactionInput, TransactionOutput,
TxMetadata, Block, Address, MultiAsset, MaTxOut
)
from sqlalchemy import func
def analyze_transaction(session, tx_hash: str):
"""Comprehensive transaction analysis."""
# Get the transaction
tx = session.query(Transaction).filter(
Transaction.hash == bytes.fromhex(tx_hash)
).first()
if not tx:
return {"error": "Transaction not found"}
# Get block information
block = session.query(Block).filter(Block.id_ == tx.block_id).first()
# Get inputs
inputs = session.query(TransactionInput, TransactionOutput, Address).join(
TransactionOutput, TransactionInput.tx_out_id == TransactionOutput.id_
).join(
Address, TransactionOutput.address_id == Address.id_
).filter(TransactionInput.tx_id == tx.id_).all()
# Get outputs
outputs = session.query(TransactionOutput, Address).join(
Address, TransactionOutput.address_id == Address.id_
).filter(TransactionOutput.tx_id == tx.id_).all()
# Get metadata (if any)
metadata = session.query(TxMetadata).filter(
TxMetadata.tx_id == tx.id_
).all()
# Calculate input/output totals
total_input = sum(inp[1].value for inp in inputs)
total_output = sum(out[0].value for out in outputs)
# Analyze inputs
input_analysis = []
for tx_input, prev_output, prev_address in inputs:
input_analysis.append({
"previous_tx_index": tx_input.tx_out_index,
"value": prev_output.value,
"address": prev_address.view,
"redeemer_index": tx_input.redeemer_id
})
# Analyze outputs
output_analysis = []
for tx_output, address in outputs:
# Check for native assets
native_assets = session.query(MaTxOut, MultiAsset).join(
MultiAsset, MaTxOut.ident == MultiAsset.id_
).filter(MaTxOut.tx_out_id == tx_output.id_).all()
assets = []
for ma_out, asset in native_assets:
assets.append({
"policy_id": asset.policy.hex(),
"asset_name": asset.name.hex(),
"fingerprint": asset.fingerprint,
"quantity": ma_out.quantity
})
output_analysis.append({
"index": tx_output.index,
"value": tx_output.value,
"address": address.view,
"native_assets": assets,
"datum_hash": tx_output.data_hash.hex() if tx_output.data_hash else None,
"script_ref": tx_output.reference_script_id is not None
})
# Parse metadata
metadata_analysis = []
for meta in metadata:
metadata_analysis.append({
"label": meta.label,
"json_metadata": meta.json_,
"bytes_metadata": meta.bytes_.hex() if meta.bytes_ else None
})
return {
"transaction": {
"hash": tx_hash,
"fee": tx.fee,
"size": tx.size,
"invalid_before": tx.invalid_before,
"invalid_hereafter": tx.invalid_hereafter,
"script_size": tx.script_size
},
"block": {
"number": block.block_no,
"hash": block.hash.hex(),
"slot": block.slot_no,
"epoch": block.epoch_no,
"time": block.time
},
"inputs": {
"count": len(inputs),
"total_value": total_input,
"details": input_analysis
},
"outputs": {
"count": len(outputs),
"total_value": total_output,
"details": output_analysis
},
"balancing": {
"input_total": total_input,
"output_total": total_output,
"fee": tx.fee,
"difference": total_input - total_output - tx.fee # Should be 0
},
"metadata": metadata_analysis
}
# Usage
session = create_session()
analysis = analyze_transaction(session, "a1b2c3d4e5f6...")
print(f"Transaction fee: {analysis['transaction']['fee'] / 1_000_000:.2f} ADA")
print(f"Inputs: {analysis['inputs']['count']}, Outputs: {analysis['outputs']['count']}")
Multi-Asset Transaction Analysis¶
from dbsync.models import MaTxMint
def analyze_native_token_transaction(session, tx_hash: str):
"""Analyze a transaction involving native tokens."""
tx = session.query(Transaction).filter(
Transaction.hash == bytes.fromhex(tx_hash)
).first()
if not tx:
return {"error": "Transaction not found"}
# Get all minting/burning in this transaction
minting_burning = session.query(MaTxMint, MultiAsset).join(
MultiAsset, MaTxMint.ident == MultiAsset.id_
).filter(MaTxMint.tx_id == tx.id_).all()
# Get all native asset outputs
asset_outputs = session.query(
TransactionOutput, MaTxOut, MultiAsset, Address
).join(MaTxOut, TransactionOutput.id_ == MaTxOut.tx_out_id).join(
MultiAsset, MaTxOut.ident == MultiAsset.id_
).join(Address, TransactionOutput.address_id == Address.id_).filter(
TransactionOutput.tx_id == tx.id_
).all()
# Analyze minting/burning
minting_analysis = {}
total_minted = 0
total_burned = 0
for mint, asset in minting_burning:
asset_id = f"{asset.policy.hex()}.{asset.name.hex()}"
if asset_id not in minting_analysis:
minting_analysis[asset_id] = {
"policy_id": asset.policy.hex(),
"asset_name": asset.name.hex(),
"fingerprint": asset.fingerprint,
"quantity": 0,
"action": None
}
minting_analysis[asset_id]["quantity"] += mint.quantity
if mint.quantity > 0:
total_minted += 1
minting_analysis[asset_id]["action"] = "mint"
else:
total_burned += 1
minting_analysis[asset_id]["action"] = "burn"
# Analyze asset distribution
distribution_analysis = {}
for tx_out, ma_out, asset, address in asset_outputs:
asset_id = f"{asset.policy.hex()}.{asset.name.hex()}"
if asset_id not in distribution_analysis:
distribution_analysis[asset_id] = {
"policy_id": asset.policy.hex(),
"asset_name": asset.name.hex(),
"fingerprint": asset.fingerprint,
"total_distributed": 0,
"recipients": []
}
distribution_analysis[asset_id]["total_distributed"] += ma_out.quantity
distribution_analysis[asset_id]["recipients"].append({
"address": address.view,
"quantity": ma_out.quantity,
"output_index": tx_out.index
})
return {
"transaction_hash": tx_hash,
"minting_burning": {
"total_assets_minted": total_minted,
"total_assets_burned": total_burned,
"details": list(minting_analysis.values())
},
"asset_distribution": {
"unique_assets": len(distribution_analysis),
"total_recipients": sum(len(dist["recipients"]) for dist in distribution_analysis.values()),
"details": list(distribution_analysis.values())
}
}
Smart Contract Transaction Analysis¶
from dbsync.models import Script, Redeemer, Datum
def analyze_smart_contract_transaction(session, tx_hash: str):
"""Analyze a transaction involving smart contracts."""
tx = session.query(Transaction).filter(
Transaction.hash == bytes.fromhex(tx_hash)
).first()
if not tx:
return {"error": "Transaction not found"}
# Get all redeemers in this transaction
redeemers = session.query(Redeemer, Script).join(
Script, Redeemer.script_hash == Script.hash_
).filter(Redeemer.tx_id == tx.id_).all()
# Get all datum references
datum_refs = session.query(TransactionOutput, Datum).join(
Datum, TransactionOutput.data_hash == Datum.hash_
).filter(TransactionOutput.tx_id == tx.id_).all()
# Analyze script execution
script_analysis = {}
for redeemer, script in redeemers:
script_hash = script.hash_.hex()
if script_hash not in script_analysis:
script_analysis[script_hash] = {
"script_type": script.type_,
"script_size": script.serialised_size,
"executions": []
}
execution_info = {
"purpose": redeemer.purpose,
"index": redeemer.index,
"execution_units": {
"memory": redeemer.unit_mem,
"steps": redeemer.unit_steps
},
"redeemer_data_size": len(redeemer.data.bytes_) if redeemer.data else 0
}
script_analysis[script_hash]["executions"].append(execution_info)
# Analyze datum usage
datum_analysis = []
for output, datum in datum_refs:
datum_analysis.append({
"output_index": output.index,
"datum_hash": datum.hash_.hex(),
"datum_size": len(datum.bytes_) if datum.bytes_ else 0,
"datum_json": datum.value # If available
})
# Calculate total execution costs
total_memory = sum(
sum(exec_["execution_units"]["memory"] for exec_ in script["executions"])
for script in script_analysis.values()
)
total_steps = sum(
sum(exec_["execution_units"]["steps"] for exec_ in script["executions"])
for script in script_analysis.values()
)
return {
"transaction_hash": tx_hash,
"script_execution": {
"scripts_executed": len(script_analysis),
"total_executions": sum(len(script["executions"]) for script in script_analysis.values()),
"total_execution_units": {
"memory": total_memory,
"steps": total_steps
},
"script_details": script_analysis
},
"datum_usage": {
"outputs_with_datums": len(datum_analysis),
"details": datum_analysis
},
"efficiency_metrics": {
"script_fee_ratio": (tx.script_size or 0) / tx.size if tx.size > 0 else 0,
"avg_memory_per_execution": total_memory / max(1, sum(len(script["executions"]) for script in script_analysis.values())),
"avg_steps_per_execution": total_steps / max(1, sum(len(script["executions"]) for script in script_analysis.values()))
}
}
Transaction Pattern Analysis¶
Address Transaction History¶
def get_address_transaction_history(session, address_bech32: str, limit: int = 100):
"""Get transaction history for an address."""
address = session.query(Address).filter(
Address.view == address_bech32
).first()
if not address:
return {"error": "Address not found"}
# Get transactions where this address appears in outputs
output_txs = session.query(Transaction, TransactionOutput, Block).join(
TransactionOutput, Transaction.id_ == TransactionOutput.tx_id
).join(Block, Transaction.block_id == Block.id_).filter(
TransactionOutput.address_id == address.id_
).order_by(Block.time.desc()).limit(limit).all()
# Get transactions where this address appears in inputs (spent from)
input_txs = session.query(Transaction, TransactionInput, TransactionOutput, Block).join(
TransactionInput, Transaction.id_ == TransactionInput.tx_id
).join(
TransactionOutput, TransactionInput.tx_out_id == TransactionOutput.id_
).join(Block, Transaction.block_id == Block.id_).filter(
TransactionOutput.address_id == address.id_
).order_by(Block.time.desc()).limit(limit).all()
# Combine and analyze
all_transactions = {}
# Process received transactions
for tx, output, block in output_txs:
tx_hash = tx.hash.hex()
if tx_hash not in all_transactions:
all_transactions[tx_hash] = {
"hash": tx_hash,
"timestamp": block.time,
"block_no": block.block_no,
"fee": tx.fee,
"received": 0,
"sent": 0,
"native_assets_received": [],
"native_assets_sent": []
}
all_transactions[tx_hash]["received"] += output.value
# Check for native assets received
native_assets = session.query(MaTxOut, MultiAsset).join(
MultiAsset, MaTxOut.ident == MultiAsset.id_
).filter(MaTxOut.tx_out_id == output.id_).all()
for ma_out, asset in native_assets:
all_transactions[tx_hash]["native_assets_received"].append({
"policy_id": asset.policy.hex(),
"asset_name": asset.name.hex(),
"fingerprint": asset.fingerprint,
"quantity": ma_out.quantity
})
# Process sent transactions
for tx, tx_input, prev_output, block in input_txs:
tx_hash = tx.hash.hex()
if tx_hash not in all_transactions:
all_transactions[tx_hash] = {
"hash": tx_hash,
"timestamp": block.time,
"block_no": block.block_no,
"fee": tx.fee,
"received": 0,
"sent": 0,
"native_assets_received": [],
"native_assets_sent": []
}
all_transactions[tx_hash]["sent"] += prev_output.value
# Sort by timestamp
transaction_list = list(all_transactions.values())
transaction_list.sort(key=lambda x: x["timestamp"], reverse=True)
return {
"address": address_bech32,
"transaction_count": len(transaction_list),
"transactions": transaction_list[:limit]
}
Fee Analysis¶
def analyze_transaction_fees(session, epoch_range: tuple):
"""Analyze transaction fee patterns across epochs."""
start_epoch, end_epoch = epoch_range
# Get fee statistics by epoch
fee_stats = session.query(
Block.epoch_no,
func.count(Transaction.id_).label('tx_count'),
func.sum(Transaction.fee).label('total_fees'),
func.avg(Transaction.fee).label('avg_fee'),
func.min(Transaction.fee).label('min_fee'),
func.max(Transaction.fee).label('max_fee'),
func.avg(Transaction.size).label('avg_size')
).join(Block, Transaction.block_id == Block.id_).filter(
Block.epoch_no.between(start_epoch, end_epoch)
).group_by(Block.epoch_no).order_by(Block.epoch_no).all()
# Analyze fee patterns
fee_analysis = {}
for stat in fee_stats:
epoch = stat.epoch_no
avg_fee_per_byte = stat.avg_fee / stat.avg_size if stat.avg_size > 0 else 0
fee_analysis[epoch] = {
"transaction_count": stat.tx_count,
"total_fees": stat.total_fees,
"average_fee": float(stat.avg_fee),
"min_fee": stat.min_fee,
"max_fee": stat.max_fee,
"average_size": float(stat.avg_size),
"avg_fee_per_byte": avg_fee_per_byte
}
# Calculate trends
epochs = sorted(fee_analysis.keys())
if len(epochs) >= 2:
first_epoch = fee_analysis[epochs[0]]
last_epoch = fee_analysis[epochs[-1]]
trends = {
"fee_trend": (last_epoch["average_fee"] - first_epoch["average_fee"]) / first_epoch["average_fee"] * 100,
"volume_trend": (last_epoch["transaction_count"] - first_epoch["transaction_count"]) / first_epoch["transaction_count"] * 100,
"efficiency_trend": (last_epoch["avg_fee_per_byte"] - first_epoch["avg_fee_per_byte"]) / first_epoch["avg_fee_per_byte"] * 100
}
else:
trends = {"fee_trend": 0, "volume_trend": 0, "efficiency_trend": 0}
return {
"epoch_range": epoch_range,
"by_epoch": fee_analysis,
"trends": trends,
"summary": {
"total_transactions": sum(stats["transaction_count"] for stats in fee_analysis.values()),
"total_fees_collected": sum(stats["total_fees"] for stats in fee_analysis.values()),
"overall_avg_fee": sum(stats["total_fees"] for stats in fee_analysis.values()) / sum(stats["transaction_count"] for stats in fee_analysis.values()) if sum(stats["transaction_count"] for stats in fee_analysis.values()) > 0 else 0
}
}
Transaction Size and Complexity Analysis¶
def analyze_transaction_complexity(session, epoch_no: int):
"""Analyze transaction complexity patterns in an epoch."""
# Get all transactions in the epoch
transactions = session.query(
Transaction.id_,
Transaction.size,
Transaction.fee,
Transaction.script_size,
func.count(TransactionInput.id_).label('input_count'),
func.count(TransactionOutput.id_).label('output_count')
).join(Block, Transaction.block_id == Block.id_).outerjoin(
TransactionInput, Transaction.id_ == TransactionInput.tx_id
).outerjoin(
TransactionOutput, Transaction.id_ == TransactionOutput.tx_id
).filter(
Block.epoch_no == epoch_no
).group_by(
Transaction.id_, Transaction.size, Transaction.fee, Transaction.script_size
).all()
# Categorize transactions
categories = {
"simple": [], # 1-2 inputs, 1-2 outputs, no scripts
"moderate": [], # 3-10 inputs/outputs, may have scripts
"complex": [], # 10+ inputs/outputs or large scripts
"script_heavy": [] # Significant script component
}
for tx in transactions:
total_ios = tx.input_count + tx.output_count
script_ratio = (tx.script_size or 0) / tx.size if tx.size > 0 else 0
if script_ratio > 0.3:
categories["script_heavy"].append(tx)
elif total_ios >= 20 or tx.size > 10000:
categories["complex"].append(tx)
elif total_ios >= 6 or (tx.script_size or 0) > 0:
categories["moderate"].append(tx)
else:
categories["simple"].append(tx)
# Calculate statistics for each category
complexity_stats = {}
for category, txs in categories.items():
if txs:
sizes = [tx.size for tx in txs]
fees = [tx.fee for tx in txs]
complexity_stats[category] = {
"count": len(txs),
"avg_size": sum(sizes) / len(sizes),
"avg_fee": sum(fees) / len(fees),
"avg_fee_per_byte": sum(f/s for f, s in zip(fees, sizes)) / len(fees),
"total_size": sum(sizes),
"total_fees": sum(fees)
}
else:
complexity_stats[category] = {
"count": 0, "avg_size": 0, "avg_fee": 0,
"avg_fee_per_byte": 0, "total_size": 0, "total_fees": 0
}
return {
"epoch": epoch_no,
"total_transactions": len(transactions),
"complexity_breakdown": complexity_stats,
"efficiency_metrics": {
"avg_transaction_size": sum(tx.size for tx in transactions) / len(transactions) if transactions else 0,
"script_adoption_rate": len(categories["script_heavy"]) / len(transactions) * 100 if transactions else 0,
"complex_transaction_rate": len(categories["complex"]) / len(transactions) * 100 if transactions else 0
}
}
# Usage examples
session = create_session()
# Analyze a specific transaction
tx_analysis = analyze_transaction(session, "a1b2c3d4e5f6...")
print(f"Transaction has {tx_analysis['inputs']['count']} inputs and {tx_analysis['outputs']['count']} outputs")
# Analyze native token transaction
token_analysis = analyze_native_token_transaction(session, "a1b2c3d4e5f6...")
print(f"Assets minted: {token_analysis['minting_burning']['total_assets_minted']}")
# Get address history
history = get_address_transaction_history(session, "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3jcu5d8ps7zex2k2xt3uqxgjqnnj83ws8lhrn493txdh6gx34hs")
print(f"Address has {history['transaction_count']} transactions")
# Analyze fee trends
fee_trends = analyze_transaction_fees(session, (300, 310))
print(f"Average fee trend: {fee_trends['trends']['fee_trend']:.2f}%")
# Analyze transaction complexity
complexity = analyze_transaction_complexity(session, 400)
print(f"Script adoption rate: {complexity['efficiency_metrics']['script_adoption_rate']:.1f}%")
This comprehensive transaction analysis guide provides tools for understanding all aspects of Cardano transactions, from basic structure and native assets to smart contract execution and ecosystem-wide patterns.