Skip to content

Best Practices Guide

This guide outlines best practices for developing applications with dbsync-py, covering performance, security, maintainability, and production deployment considerations.

Application Architecture

Layered Architecture Pattern

# data_layer.py - Database access layer
from dbsync.session import create_session
from dbsync.models import Transaction, Block
from contextlib import contextmanager

class DataAccess:
    """Data access layer for database operations."""

    def __init__(self, database_url: str):
        self.database_url = database_url

    @contextmanager
    def get_session(self):
        session = create_session(self.database_url)
        try:
            yield session
        finally:
            session.close()

    def get_transaction_by_hash(self, tx_hash: str):
        with self.get_session() as session:
            return session.query(Transaction).filter(
                Transaction.hash == bytes.fromhex(tx_hash)
            ).first()

# business_layer.py - Business logic layer
from typing import Optional, Dict, Any

class TransactionService:
    """Business logic for transaction operations."""

    def __init__(self, data_access: DataAccess):
        self.data_access = data_access

    def get_transaction_summary(self, tx_hash: str) -> Optional[Dict[str, Any]]:
        """Get a summary of transaction information."""
        tx = self.data_access.get_transaction_by_hash(tx_hash)

        if not tx:
            return None

        return {
            "hash": tx_hash,
            "fee_ada": tx.fee / 1_000_000,
            "size_kb": tx.size / 1024,
            "block_height": tx.block.block_no if tx.block else None
        }

# api_layer.py - API/presentation layer
from fastapi import FastAPI, HTTPException

app = FastAPI()
data_access = DataAccess("postgresql://...")
tx_service = TransactionService(data_access)

@app.get("/transaction/{tx_hash}")
async def get_transaction(tx_hash: str):
    summary = tx_service.get_transaction_summary(tx_hash)
    if not summary:
        raise HTTPException(status_code=404, detail="Transaction not found")
    return summary

Dependency Injection

# config.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class DatabaseConfig:
    url: str
    pool_size: int = 20
    max_overflow: int = 30
    echo: bool = False

@dataclass
class AppConfig:
    database: DatabaseConfig
    cache_ttl: int = 300
    api_rate_limit: int = 100

# services.py
from abc import ABC, abstractmethod

class CacheService(ABC):
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        pass

    @abstractmethod
    def set(self, key: str, value: Any, ttl: int) -> None:
        pass

class RedisCacheService(CacheService):
    def __init__(self, redis_url: str):
        import redis
        self.redis = redis.from_url(redis_url)

    def get(self, key: str) -> Optional[Any]:
        import json
        value = self.redis.get(key)
        return json.loads(value) if value else None

    def set(self, key: str, value: Any, ttl: int) -> None:
        import json
        self.redis.setex(key, ttl, json.dumps(value))

# dependency_container.py
class Container:
    def __init__(self, config: AppConfig):
        self.config = config
        self.data_access = DataAccess(config.database.url)
        self.cache_service = RedisCacheService("redis://localhost")
        self.transaction_service = TransactionService(
            self.data_access,
            self.cache_service
        )

Performance Optimization

Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
from sqlalchemy.orm import sessionmaker

class OptimizedDataAccess:
    """Data access with optimized connection pooling."""

    def __init__(self, database_url: str):
        # Configure engine with proper pooling
        self.engine = create_engine(
            database_url,
            poolclass=QueuePool,
            pool_size=20,          # Number of connections to maintain
            max_overflow=30,       # Additional connections when needed
            pool_pre_ping=True,    # Validate connections before use
            pool_recycle=3600,     # Recycle connections after 1 hour
            echo=False             # Set to True for SQL debugging
        )

        self.SessionLocal = sessionmaker(bind=self.engine)

    @contextmanager
    def get_session(self):
        session = self.SessionLocal()
        try:
            yield session
        finally:
            session.close()

    def close_all_connections(self):
        """Close all connections in the pool."""
        self.engine.dispose()

Query Optimization Patterns

from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy import and_, or_, func

class OptimizedQueryService:
    """Service with optimized query patterns."""

    def __init__(self, data_access: OptimizedDataAccess):
        self.data_access = data_access

    def get_transactions_with_details(self, block_id: int, limit: int = 100):
        """Get transactions with related data using eager loading."""

        with self.data_access.get_session() as session:
            return session.query(Transaction).options(
                joinedload(Transaction.block),  # One-to-one relationship
                selectinload(Transaction.inputs),  # One-to-many relationship
                selectinload(Transaction.outputs)
            ).filter(
                Transaction.block_id == block_id
            ).limit(limit).all()

    def get_address_utxos_optimized(self, address_ids: list[int]):
        """Optimized UTXO query for multiple addresses."""

        with self.data_access.get_session() as session:
            # Use a single query instead of multiple
            spent_output_ids = session.query(TransactionInput.tx_out_id).distinct()

            return session.query(TransactionOutput).filter(
                and_(
                    TransactionOutput.address_id.in_(address_ids),
                    ~TransactionOutput.id_.in_(spent_output_ids)
                )
            ).all()

    def get_epoch_statistics_batch(self, epoch_range: tuple):
        """Get statistics for multiple epochs efficiently."""

        start_epoch, end_epoch = epoch_range

        with self.data_access.get_session() as session:
            # Single query for all epochs
            return session.query(
                Block.epoch_no,
                func.count(Transaction.id_).label('tx_count'),
                func.sum(Transaction.fee).label('total_fees'),
                func.avg(Transaction.size).label('avg_size')
            ).join(Transaction).filter(
                Block.epoch_no.between(start_epoch, end_epoch)
            ).group_by(Block.epoch_no).all()

Caching Strategies

from functools import wraps
import hashlib
import json

def cache_result(cache_service: CacheService, ttl: int = 300):
    """Decorator for caching function results."""

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from function name and arguments
            key_data = {
                'func': func.__name__,
                'args': args,
                'kwargs': kwargs
            }
            cache_key = hashlib.md5(
                json.dumps(key_data, sort_keys=True).encode()
            ).hexdigest()

            # Try to get from cache
            result = cache_service.get(cache_key)
            if result is not None:
                return result

            # Execute function and cache result
            result = func(*args, **kwargs)
            cache_service.set(cache_key, result, ttl)
            return result

        return wrapper
    return decorator

class CachedTransactionService:
    """Transaction service with caching."""

    def __init__(self, data_access: DataAccess, cache_service: CacheService):
        self.data_access = data_access
        self.cache_service = cache_service

    @cache_result(cache_service, ttl=600)  # Cache for 10 minutes
    def get_transaction_summary(self, tx_hash: str):
        """Cached transaction summary."""
        return super().get_transaction_summary(tx_hash)

    @cache_result(cache_service, ttl=3600)  # Cache for 1 hour
    def get_epoch_statistics(self, epoch_no: int):
        """Cached epoch statistics."""
        with self.data_access.get_session() as session:
            return session.query(
                func.count(Transaction.id_),
                func.sum(Transaction.fee)
            ).join(Block).filter(
                Block.epoch_no == epoch_no
            ).first()

Error Handling and Resilience

Comprehensive Error Handling

from sqlalchemy.exc import (
    OperationalError, IntegrityError, DataError,
    DatabaseError, TimeoutError
)
from typing import Optional, TypeVar, Callable
import logging
import time

T = TypeVar('T')

class DatabaseService:
    """Service with comprehensive error handling."""

    def __init__(self, data_access: DataAccess):
        self.data_access = data_access
        self.logger = logging.getLogger(__name__)

    def execute_with_retry(
        self,
        operation: Callable[[], T],
        max_retries: int = 3,
        base_delay: float = 1.0
    ) -> Optional[T]:
        """Execute database operation with retry logic."""

        last_exception = None

        for attempt in range(max_retries + 1):
            try:
                return operation()

            except OperationalError as e:
                last_exception = e
                if "connection" in str(e).lower():
                    self.logger.warning(f"Connection error on attempt {attempt + 1}: {e}")
                else:
                    self.logger.error(f"Operational error: {e}")
                    break

            except (IntegrityError, DataError) as e:
                # Don't retry on data integrity issues
                self.logger.error(f"Data integrity error: {e}")
                break

            except TimeoutError as e:
                last_exception = e
                self.logger.warning(f"Timeout on attempt {attempt + 1}: {e}")

            except Exception as e:
                self.logger.error(f"Unexpected error: {e}")
                break

            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)  # Exponential backoff
                self.logger.info(f"Retrying in {delay} seconds...")
                time.sleep(delay)

        self.logger.error(f"Operation failed after {max_retries + 1} attempts")
        raise last_exception if last_exception else Exception("Operation failed")

    def safe_get_transaction(self, tx_hash: str) -> Optional[Transaction]:
        """Safely get transaction with error handling."""

        def operation():
            with self.data_access.get_session() as session:
                return session.query(Transaction).filter(
                    Transaction.hash == bytes.fromhex(tx_hash)
                ).first()

        try:
            return self.execute_with_retry(operation)
        except Exception as e:
            self.logger.error(f"Failed to get transaction {tx_hash}: {e}")
            return None

Circuit Breaker Pattern

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, blocking requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker for database operations."""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection."""

        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time < self.timeout:
                raise Exception("Circuit breaker is OPEN")
            else:
                self.state = CircuitState.HALF_OPEN

        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result

        except self.expected_exception as e:
            self.on_failure()
            raise e

    def on_success(self):
        """Handle successful operation."""
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def on_failure(self):
        """Handle failed operation."""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

class ResilientDatabaseService:
    """Database service with circuit breaker."""

    def __init__(self, data_access: DataAccess):
        self.data_access = data_access
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=3,
            timeout=30,
            expected_exception=OperationalError
        )

    def get_transaction_with_protection(self, tx_hash: str):
        """Get transaction with circuit breaker protection."""

        def operation():
            with self.data_access.get_session() as session:
                return session.query(Transaction).filter(
                    Transaction.hash == bytes.fromhex(tx_hash)
                ).first()

        return self.circuit_breaker.call(operation)

Security Best Practices

Input Validation

import re
from typing import Union

class ValidationError(Exception):
    """Custom validation error."""
    pass

class InputValidator:
    """Input validation utilities."""

    @staticmethod
    def validate_tx_hash(tx_hash: str) -> str:
        """Validate transaction hash format."""
        if not tx_hash:
            raise ValidationError("Transaction hash cannot be empty")

        # Remove any whitespace
        tx_hash = tx_hash.strip()

        # Check length (64 hex characters)
        if len(tx_hash) != 64:
            raise ValidationError("Transaction hash must be 64 characters long")

        # Check hex format
        if not re.match(r'^[0-9a-fA-F]{64}$', tx_hash):
            raise ValidationError("Transaction hash must contain only hexadecimal characters")

        return tx_hash.lower()

    @staticmethod
    def validate_epoch_number(epoch_no: Union[int, str]) -> int:
        """Validate epoch number."""
        try:
            epoch_int = int(epoch_no)
        except (ValueError, TypeError):
            raise ValidationError("Epoch number must be a valid integer")

        if epoch_int < 0:
            raise ValidationError("Epoch number cannot be negative")

        if epoch_int > 1000000:  # Reasonable upper bound
            raise ValidationError("Epoch number seems unreasonably large")

        return epoch_int

    @staticmethod
    def validate_address(address: str) -> str:
        """Validate Cardano address format."""
        if not address:
            raise ValidationError("Address cannot be empty")

        address = address.strip()

        # Basic format checks for bech32 addresses
        if not (address.startswith(('addr1', 'stake1', 'pool1', 'drep1')) or
                re.match(r'^[0-9a-fA-F]+$', address)):
            raise ValidationError("Invalid address format")

        return address

class SecureTransactionService:
    """Transaction service with input validation."""

    def __init__(self, data_access: DataAccess):
        self.data_access = data_access
        self.validator = InputValidator()

    def get_transaction(self, tx_hash: str):
        """Get transaction with validated input."""
        validated_hash = self.validator.validate_tx_hash(tx_hash)

        with self.data_access.get_session() as session:
            return session.query(Transaction).filter(
                Transaction.hash == bytes.fromhex(validated_hash)
            ).first()

SQL Injection Prevention

# ALWAYS use parameterized queries through SQLAlchemy ORM or text() with bound parameters

# ✅ GOOD: Using ORM (automatically parameterized)
def get_transactions_by_epoch_safe(session, epoch_no: int):
    return session.query(Transaction).join(Block).filter(
        Block.epoch_no == epoch_no  # Automatically parameterized
    ).all()

# ✅ GOOD: Using text() with bound parameters
from sqlalchemy import text

def get_custom_query_safe(session, epoch_no: int):
    sql = text("""
        SELECT t.id, t.hash, t.fee
        FROM tx t
        JOIN block b ON t.block_id = b.id
        WHERE b.epoch_no = :epoch_no
    """)
    return session.execute(sql, {"epoch_no": epoch_no}).fetchall()

# ❌ BAD: String interpolation (vulnerable to SQL injection)
def get_transactions_unsafe(session, epoch_no):
    # NEVER DO THIS
    sql = f"SELECT * FROM tx WHERE epoch_no = {epoch_no}"
    return session.execute(sql).fetchall()

Production Deployment

Configuration Management

# config.py
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class DatabaseConfig:
    url: str
    pool_size: int = 20
    max_overflow: int = 30
    echo: bool = False

    @classmethod
    def from_env(cls) -> 'DatabaseConfig':
        return cls(
            url=os.getenv('DATABASE_URL', 'postgresql://localhost/cexplorer'),
            pool_size=int(os.getenv('DB_POOL_SIZE', '20')),
            max_overflow=int(os.getenv('DB_MAX_OVERFLOW', '30')),
            echo=os.getenv('DB_ECHO', 'false').lower() == 'true'
        )

@dataclass
class LoggingConfig:
    level: str = "INFO"
    format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

    @classmethod
    def from_env(cls) -> 'LoggingConfig':
        return cls(
            level=os.getenv('LOG_LEVEL', 'INFO'),
            format=os.getenv('LOG_FORMAT', cls.format)
        )

@dataclass
class AppConfig:
    database: DatabaseConfig
    logging: LoggingConfig
    debug: bool = False

    @classmethod
    def from_env(cls) -> 'AppConfig':
        return cls(
            database=DatabaseConfig.from_env(),
            logging=LoggingConfig.from_env(),
            debug=os.getenv('DEBUG', 'false').lower() == 'true'
        )

Monitoring and Logging

import logging
import time
from functools import wraps
from typing import Any, Callable

def setup_logging(config: LoggingConfig):
    """Configure application logging."""
    logging.basicConfig(
        level=getattr(logging, config.level.upper()),
        format=config.format,
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler('app.log')
        ]
    )

def log_performance(logger: logging.Logger):
    """Decorator to log function performance."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"{func.__name__} completed in {duration:.3f}s")
                return result
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"{func.__name__} failed after {duration:.3f}s: {e}")
                raise
        return wrapper
    return decorator

class MetricsCollector:
    """Collect application metrics."""

    def __init__(self):
        self.query_count = 0
        self.total_query_time = 0.0
        self.error_count = 0

    def record_query(self, duration: float):
        """Record query execution metrics."""
        self.query_count += 1
        self.total_query_time += duration

    def record_error(self):
        """Record error occurrence."""
        self.error_count += 1

    def get_stats(self) -> dict:
        """Get current statistics."""
        avg_query_time = (
            self.total_query_time / self.query_count
            if self.query_count > 0 else 0
        )

        return {
            "query_count": self.query_count,
            "total_query_time": self.total_query_time,
            "avg_query_time": avg_query_time,
            "error_count": self.error_count
        }

class MonitoredDataAccess(OptimizedDataAccess):
    """Data access with monitoring."""

    def __init__(self, config: DatabaseConfig):
        super().__init__(config.url)
        self.logger = logging.getLogger(__name__)
        self.metrics = MetricsCollector()

    @contextmanager
    def get_session(self):
        start_time = time.time()
        session = self.SessionLocal()
        try:
            yield session
            duration = time.time() - start_time
            self.metrics.record_query(duration)
            self.logger.debug(f"Session completed in {duration:.3f}s")
        except Exception as e:
            self.metrics.record_error()
            self.logger.error(f"Session failed: {e}")
            raise
        finally:
            session.close()

Health Checks

from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class HealthStatus:
    healthy: bool
    details: Dict[str, Any]

class HealthChecker:
    """Application health checker."""

    def __init__(self, data_access: MonitoredDataAccess):
        self.data_access = data_access

    def check_database(self) -> HealthStatus:
        """Check database connectivity and performance."""
        try:
            start_time = time.time()

            with self.data_access.get_session() as session:
                # Simple query to test connectivity
                result = session.execute("SELECT 1").scalar()

            duration = time.time() - start_time

            if duration > 5.0:  # Slow response
                return HealthStatus(
                    healthy=False,
                    details={
                        "status": "slow",
                        "response_time": duration,
                        "message": "Database response time is too slow"
                    }
                )

            return HealthStatus(
                healthy=True,
                details={
                    "status": "ok",
                    "response_time": duration
                }
            )

        except Exception as e:
            return HealthStatus(
                healthy=False,
                details={
                    "status": "error",
                    "error": str(e)
                }
            )

    def check_overall_health(self) -> Dict[str, Any]:
        """Comprehensive health check."""
        db_health = self.check_database()
        metrics = self.data_access.metrics.get_stats()

        overall_healthy = db_health.healthy

        return {
            "healthy": overall_healthy,
            "timestamp": time.time(),
            "checks": {
                "database": db_health.details
            },
            "metrics": metrics
        }

# Health check endpoint
from fastapi import FastAPI

app = FastAPI()
health_checker = HealthChecker(data_access)

@app.get("/health")
async def health_check():
    return health_checker.check_overall_health()

Testing Best Practices

Unit Testing

import pytest
from unittest.mock import Mock, patch
from sqlalchemy.orm import Session

class TestTransactionService:
    """Unit tests for TransactionService."""

    @pytest.fixture
    def mock_data_access(self):
        return Mock(spec=DataAccess)

    @pytest.fixture
    def transaction_service(self, mock_data_access):
        return TransactionService(mock_data_access)

    def test_get_transaction_summary_success(self, transaction_service, mock_data_access):
        # Arrange
        mock_tx = Mock()
        mock_tx.fee = 150000
        mock_tx.size = 300
        mock_tx.block.block_no = 12345

        mock_session = Mock(spec=Session)
        mock_session.query().filter().first.return_value = mock_tx
        mock_data_access.get_session.return_value.__enter__.return_value = mock_session

        # Act
        result = transaction_service.get_transaction_summary("abc123")

        # Assert
        assert result is not None
        assert result["fee_ada"] == 0.15
        assert result["size_kb"] == 0.3
        assert result["block_height"] == 12345

    def test_get_transaction_summary_not_found(self, transaction_service, mock_data_access):
        # Arrange
        mock_session = Mock(spec=Session)
        mock_session.query().filter().first.return_value = None
        mock_data_access.get_session.return_value.__enter__.return_value = mock_session

        # Act
        result = transaction_service.get_transaction_summary("nonexistent")

        # Assert
        assert result is None

Integration Testing

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from testcontainers.postgres import PostgresContainer

class TestDatabaseIntegration:
    """Integration tests with real database."""

    @pytest.fixture(scope="class")
    def postgres_container(self):
        with PostgresContainer("postgres:13") as postgres:
            yield postgres

    @pytest.fixture
    def test_session(self, postgres_container):
        engine = create_engine(postgres_container.get_connection_url())
        SessionLocal = sessionmaker(bind=engine)

        # Create tables (in real tests, you'd use migrations)
        # Base.metadata.create_all(bind=engine)

        session = SessionLocal()
        try:
            yield session
        finally:
            session.close()

    def test_transaction_crud(self, test_session):
        # Test actual database operations
        pass

This comprehensive best practices guide provides a foundation for building robust, maintainable, and scalable applications with dbsync-py.