Conway Era Governance Examples¶
This section provides comprehensive examples for working with Conway era governance features in Cardano, including DReps, constitutional committees, governance actions, and voting procedures.
DRep (Delegated Representatives) Operations¶
DRep Registration and Management¶
from dbsync.session import create_session
from dbsync.models import (
DrepHash, DrepRegistration, DrepUpdate, DrepDeregistration,
VotingAnchor, Transaction, Block
)
from sqlalchemy import desc, and_
def get_drep_info(session, drep_id: str):
"""Get comprehensive DRep information."""
# Get DRep hash record
drep = session.query(DrepHash).filter(DrepHash.view == drep_id).first()
if not drep:
return {"error": "DRep not found"}
# Get latest registration
latest_registration = session.query(DrepRegistration).filter(
DrepRegistration.drep_hash_id == drep.id_
).order_by(desc(DrepRegistration.active_epoch_no)).first()
# Get latest update (if any)
latest_update = session.query(DrepUpdate).filter(
DrepUpdate.drep_hash_id == drep.id_
).order_by(desc(DrepUpdate.active_epoch_no)).first()
# Check for deregistration
deregistration = session.query(DrepDeregistration).filter(
DrepDeregistration.drep_hash_id == drep.id_
).first()
# Get voting anchor information
anchor_info = None
if latest_update and latest_update.voting_anchor_id:
anchor = session.query(VotingAnchor).filter(
VotingAnchor.id_ == latest_update.voting_anchor_id
).first()
if anchor:
anchor_info = {
"url": anchor.url,
"data_hash": anchor.data_hash.hex()
}
elif latest_registration and latest_registration.voting_anchor_id:
anchor = session.query(VotingAnchor).filter(
VotingAnchor.id_ == latest_registration.voting_anchor_id
).first()
if anchor:
anchor_info = {
"url": anchor.url,
"data_hash": anchor.data_hash.hex()
}
return {
"drep_id": drep_id,
"drep_hash": drep.raw.hex(),
"registration": {
"epoch": latest_registration.active_epoch_no if latest_registration else None,
"deposit": latest_registration.deposit if latest_registration else None,
"voting_anchor": anchor_info
},
"latest_update_epoch": latest_update.active_epoch_no if latest_update else None,
"is_active": deregistration is None,
"deregistration_epoch": deregistration.active_epoch_no if deregistration else None
}
# Usage
session = create_session()
drep_info = get_drep_info(session, "drep1...")
print(f"DRep registered in epoch {drep_info['registration']['epoch']}")
DRep Voting History¶
from dbsync.models import VotingProcedure, GovActionProposal
def get_drep_voting_history(session, drep_id: str, limit: int = 50):
"""Get voting history for a DRep."""
drep = session.query(DrepHash).filter(DrepHash.view == drep_id).first()
if not drep:
return []
# Get voting procedures for this DRep
votes = session.query(
VotingProcedure, GovActionProposal, Transaction, Block
).join(
GovActionProposal, VotingProcedure.gov_action_proposal_id == GovActionProposal.id_
).join(
Transaction, VotingProcedure.tx_id == Transaction.id_
).join(
Block, Transaction.block_id == Block.id_
).filter(
VotingProcedure.drep_voter == drep.id_
).order_by(desc(Block.time)).limit(limit).all()
voting_history = []
for vote, proposal, tx, block in votes:
voting_history.append({
"proposal_id": proposal.id_,
"proposal_type": proposal.type_,
"vote": vote.vote,
"epoch": block.epoch_no,
"timestamp": block.time,
"tx_hash": tx.hash.hex()
})
return voting_history
# Usage
voting_history = get_drep_voting_history(session, "drep1...")
for vote in voting_history:
print(f"Voted {vote['vote']} on {vote['proposal_type']} proposal in epoch {vote['epoch']}")
Constitutional Committee Operations¶
Committee Member Information¶
from dbsync.models import (
CommitteeHash, CommitteeRegistration, CommitteeDeregistration,
CommitteeMember
)
def get_committee_info(session, committee_hash: str):
"""Get constitutional committee member information."""
committee = session.query(CommitteeHash).filter(
CommitteeHash.view == committee_hash
).first()
if not committee:
return {"error": "Committee member not found"}
# Get registration info
registration = session.query(CommitteeRegistration).filter(
CommitteeRegistration.committee_hash_id == committee.id_
).first()
# Check for deregistration
deregistration = session.query(CommitteeDeregistration).filter(
CommitteeDeregistration.committee_hash_id == committee.id_
).first()
# Get current committee membership status
current_member = session.query(CommitteeMember).filter(
CommitteeMember.committee_hash_id == committee.id_
).first()
return {
"committee_hash": committee_hash,
"raw_hash": committee.raw.hex(),
"is_registered": registration is not None,
"registration_epoch": registration.active_epoch_no if registration else None,
"is_deregistered": deregistration is not None,
"deregistration_epoch": deregistration.active_epoch_no if deregistration else None,
"is_current_member": current_member is not None,
"term_start_epoch": current_member.epoch_no if current_member else None
}
def get_committee_voting_record(session, committee_hash: str):
"""Get voting record for a committee member."""
committee = session.query(CommitteeHash).filter(
CommitteeHash.view == committee_hash
).first()
if not committee:
return []
votes = session.query(
VotingProcedure, GovActionProposal, Block
).join(
GovActionProposal, VotingProcedure.gov_action_proposal_id == GovActionProposal.id_
).join(
Transaction, VotingProcedure.tx_id == Transaction.id_
).join(
Block, Transaction.block_id == Block.id_
).filter(
VotingProcedure.committee_voter == committee.id_
).order_by(desc(Block.time)).all()
return [
{
"proposal_id": proposal.id_,
"proposal_type": proposal.type_,
"vote": vote.vote,
"epoch": block.epoch_no,
"timestamp": block.time
}
for vote, proposal, block in votes
]
Governance Actions and Proposals¶
Active Governance Proposals¶
from dbsync.models import GovActionProposal, VotingProcedure
def get_active_governance_proposals(session):
"""Get all currently active governance proposals."""
active_proposals = session.query(GovActionProposal).filter(
and_(
GovActionProposal.ratified_epoch.is_(None),
GovActionProposal.expired_epoch.is_(None),
GovActionProposal.dropped_epoch.is_(None)
)
).all()
proposals_with_votes = []
for proposal in active_proposals:
# Get vote counts
vote_counts = session.query(
VotingProcedure.vote,
func.count(VotingProcedure.id_).label('count')
).filter(
VotingProcedure.gov_action_proposal_id == proposal.id_
).group_by(VotingProcedure.vote).all()
# Get submission transaction info
tx = session.query(Transaction).filter(
Transaction.id_ == proposal.tx_id
).first()
block = session.query(Block).filter(
Block.id_ == tx.block_id
).first() if tx else None
proposals_with_votes.append({
"id": proposal.id_,
"type": proposal.type_,
"description": proposal.description,
"deposit": proposal.deposit,
"return_address": proposal.return_address,
"expiration": proposal.expiration,
"submitted_epoch": block.epoch_no if block else None,
"vote_counts": {vote: count for vote, count in vote_counts},
"tx_hash": tx.hash.hex() if tx else None
})
return proposals_with_votes
# Usage
active_proposals = get_active_governance_proposals(session)
for proposal in active_proposals:
votes = proposal['vote_counts']
print(f"{proposal['type']} proposal: {votes.get('Yes', 0)} Yes, {votes.get('No', 0)} No, {votes.get('Abstain', 0)} Abstain")
Proposal Lifecycle Analysis¶
def analyze_proposal_lifecycle(session, proposal_id: int):
"""Analyze the complete lifecycle of a governance proposal."""
proposal = session.query(GovActionProposal).filter(
GovActionProposal.id_ == proposal_id
).first()
if not proposal:
return {"error": "Proposal not found"}
# Get submission info
submission_tx = session.query(Transaction).filter(
Transaction.id_ == proposal.tx_id
).first()
submission_block = session.query(Block).filter(
Block.id_ == submission_tx.block_id
).first() if submission_tx else None
# Get all votes
votes = session.query(VotingProcedure, Block).join(
Transaction, VotingProcedure.tx_id == Transaction.id_
).join(
Block, Transaction.block_id == Block.id_
).filter(
VotingProcedure.gov_action_proposal_id == proposal_id
).order_by(Block.time).all()
# Analyze voting patterns over time
voting_timeline = []
vote_totals = {"Yes": 0, "No": 0, "Abstain": 0}
for vote_proc, block in votes:
vote_totals[vote_proc.vote] += 1
voting_timeline.append({
"epoch": block.epoch_no,
"timestamp": block.time,
"vote": vote_proc.vote,
"voter_type": "DRep" if vote_proc.drep_voter else
"Committee" if vote_proc.committee_voter else
"Pool" if vote_proc.pool_voter else "Unknown",
"cumulative_totals": vote_totals.copy()
})
# Determine final status
status = "Active"
if proposal.ratified_epoch:
status = "Ratified"
elif proposal.expired_epoch:
status = "Expired"
elif proposal.dropped_epoch:
status = "Dropped"
return {
"proposal_id": proposal_id,
"type": proposal.type_,
"description": proposal.description,
"deposit": proposal.deposit,
"submission": {
"epoch": submission_block.epoch_no if submission_block else None,
"timestamp": submission_block.time if submission_block else None,
"tx_hash": submission_tx.hash.hex() if submission_tx else None
},
"status": status,
"final_epoch": proposal.ratified_epoch or proposal.expired_epoch or proposal.dropped_epoch,
"total_votes": len(votes),
"vote_breakdown": vote_totals,
"voting_timeline": voting_timeline
}
Governance Action Types Analysis¶
def analyze_governance_action_types(session, epoch_range: tuple):
"""Analyze governance action types in a given epoch range."""
start_epoch, end_epoch = epoch_range
proposals = session.query(GovActionProposal).join(
Transaction
).join(Block).filter(
Block.epoch_no.between(start_epoch, end_epoch)
).all()
type_analysis = {}
for proposal in proposals:
prop_type = proposal.type_
if prop_type not in type_analysis:
type_analysis[prop_type] = {
"count": 0,
"total_deposit": 0,
"ratified": 0,
"expired": 0,
"dropped": 0,
"active": 0,
"avg_deposit": 0
}
stats = type_analysis[prop_type]
stats["count"] += 1
stats["total_deposit"] += proposal.deposit
if proposal.ratified_epoch:
stats["ratified"] += 1
elif proposal.expired_epoch:
stats["expired"] += 1
elif proposal.dropped_epoch:
stats["dropped"] += 1
else:
stats["active"] += 1
# Calculate averages
for stats in type_analysis.values():
if stats["count"] > 0:
stats["avg_deposit"] = stats["total_deposit"] / stats["count"]
return {
"epoch_range": epoch_range,
"by_type": type_analysis,
"summary": {
"total_proposals": sum(stats["count"] for stats in type_analysis.values()),
"total_deposit": sum(stats["total_deposit"] for stats in type_analysis.values())
}
}
Stake Pool Governance Participation¶
Pool Voting Behavior¶
from dbsync.models import PoolHash, PoolRegistration
def analyze_pool_governance_participation(session, pool_bech32: str):
"""Analyze a stake pool's governance participation."""
pool = session.query(PoolHash).filter(PoolHash.view == pool_bech32).first()
if not pool:
return {"error": "Pool not found"}
# Get pool voting history
votes = session.query(VotingProcedure, GovActionProposal, Block).join(
GovActionProposal, VotingProcedure.gov_action_proposal_id == GovActionProposal.id_
).join(
Transaction, VotingProcedure.tx_id == Transaction.id_
).join(
Block, Transaction.block_id == Block.id_
).filter(
VotingProcedure.pool_voter == pool.id_
).order_by(desc(Block.time)).all()
# Analyze voting patterns
vote_patterns = {
"total_votes": len(votes),
"by_outcome": {"Yes": 0, "No": 0, "Abstain": 0},
"by_proposal_type": {},
"participation_epochs": set()
}
for vote, proposal, block in votes:
vote_patterns["by_outcome"][vote.vote] += 1
vote_patterns["participation_epochs"].add(block.epoch_no)
prop_type = proposal.type_
if prop_type not in vote_patterns["by_proposal_type"]:
vote_patterns["by_proposal_type"][prop_type] = {"Yes": 0, "No": 0, "Abstain": 0}
vote_patterns["by_proposal_type"][prop_type][vote.vote] += 1
vote_patterns["epochs_participated"] = len(vote_patterns["participation_epochs"])
del vote_patterns["participation_epochs"] # Remove set for JSON serialization
return {
"pool_id": pool_bech32,
"voting_patterns": vote_patterns,
"recent_votes": [
{
"proposal_type": proposal.type_,
"vote": vote.vote,
"epoch": block.epoch_no,
"timestamp": block.time
}
for vote, proposal, block in votes[:10] # Last 10 votes
]
}
Governance Analytics¶
Voting Power Distribution¶
def analyze_voting_power_distribution(session, epoch_no: int):
"""Analyze voting power distribution across different voter types."""
# Get all votes in the specified epoch
votes_in_epoch = session.query(VotingProcedure).join(
Transaction
).join(Block).filter(
Block.epoch_no == epoch_no
).all()
voting_power = {
"drep_votes": 0,
"committee_votes": 0,
"pool_votes": 0,
"total_unique_voters": {
"dreps": set(),
"committee": set(),
"pools": set()
}
}
for vote in votes_in_epoch:
if vote.drep_voter:
voting_power["drep_votes"] += 1
voting_power["total_unique_voters"]["dreps"].add(vote.drep_voter)
elif vote.committee_voter:
voting_power["committee_votes"] += 1
voting_power["total_unique_voters"]["committee"].add(vote.committee_voter)
elif vote.pool_voter:
voting_power["pool_votes"] += 1
voting_power["total_unique_voters"]["pools"].add(vote.pool_voter)
return {
"epoch": epoch_no,
"vote_counts": {
"drep_votes": voting_power["drep_votes"],
"committee_votes": voting_power["committee_votes"],
"pool_votes": voting_power["pool_votes"]
},
"unique_voters": {
"dreps": len(voting_power["total_unique_voters"]["dreps"]),
"committee": len(voting_power["total_unique_voters"]["committee"]),
"pools": len(voting_power["total_unique_voters"]["pools"])
},
"total_votes": sum([
voting_power["drep_votes"],
voting_power["committee_votes"],
voting_power["pool_votes"]
])
}
Proposal Success Rate Analysis¶
def analyze_proposal_success_rates(session, start_epoch: int, end_epoch: int):
"""Analyze proposal success rates by type over time."""
proposals = session.query(GovActionProposal).join(
Transaction
).join(Block).filter(
Block.epoch_no.between(start_epoch, end_epoch)
).all()
success_analysis = {}
for proposal in proposals:
prop_type = proposal.type_
if prop_type not in success_analysis:
success_analysis[prop_type] = {
"total": 0,
"ratified": 0,
"expired": 0,
"dropped": 0,
"still_active": 0,
"success_rate": 0.0
}
stats = success_analysis[prop_type]
stats["total"] += 1
if proposal.ratified_epoch:
stats["ratified"] += 1
elif proposal.expired_epoch:
stats["expired"] += 1
elif proposal.dropped_epoch:
stats["dropped"] += 1
else:
stats["still_active"] += 1
# Calculate success rates
for prop_type, stats in success_analysis.items():
concluded = stats["ratified"] + stats["expired"] + stats["dropped"]
if concluded > 0:
stats["success_rate"] = stats["ratified"] / concluded * 100
return {
"epoch_range": (start_epoch, end_epoch),
"by_type": success_analysis,
"overall": {
"total_proposals": sum(stats["total"] for stats in success_analysis.values()),
"total_ratified": sum(stats["ratified"] for stats in success_analysis.values()),
"overall_success_rate": (
sum(stats["ratified"] for stats in success_analysis.values()) /
max(1, sum(stats["ratified"] + stats["expired"] + stats["dropped"]
for stats in success_analysis.values())) * 100
)
}
}
# Usage examples
session = create_session()
# Analyze recent governance activity
recent_activity = analyze_governance_action_types(session, (450, 460))
print(f"Found {recent_activity['summary']['total_proposals']} proposals in epochs 450-460")
# Check voting power distribution
voting_power = analyze_voting_power_distribution(session, 455)
print(f"Epoch 455 had {voting_power['total_votes']} total votes from {voting_power['unique_voters']['dreps']} DReps")
# Analyze proposal success rates
success_rates = analyze_proposal_success_rates(session, 400, 500)
print(f"Overall success rate: {success_rates['overall']['overall_success_rate']:.1f}%")
This comprehensive governance guide provides practical examples for analyzing all aspects of Conway era governance, from individual DRep activity to ecosystem-wide governance analytics.