Skip to content

Database Operations

Overview

The application uses PostgreSQL with async SQLModel/SQLAlchemy for database operations.

Configuration

Database configuration in app/settings.py:

POSTGRES_SERVER: str = "localhost"
POSTGRES_PORT: int = 5432
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "postgres"
POSTGRES_DB: str = "app_db"

# Connection pool
POOL_SIZE: int = 5
MAX_OVERFLOW: int = 10

Session Management

Getting a Session

Use the async context manager:

from app.storage.db import async_session

async with async_session() as session:
    async with session.begin():
        # Database operations here
        result = await session.execute(select(Author))

Dependency Injection

For HTTP endpoints, use SessionDep:

from app.dependencies import SessionDep

@router.get("/authors")
async def get_authors(session: SessionDep) -> list[Author]:
    """Session is automatically provided and cleaned up."""
    result = await session.execute(select(Author))
    return list(result.scalars().all())

Models

Defining Models

from sqlmodel import Field, SQLModel

class Author(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)

Async Relationships

For models with relationships, inherit from BaseModel:

from app.models.base import BaseModel
from sqlmodel import Relationship

class Author(BaseModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    books: list["Book"] = Relationship(back_populates="author")

class Book(BaseModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    author_id: int = Field(foreign_key="author.id")
    author: Author = Relationship(back_populates="books")

Eager Loading

from sqlalchemy.orm import selectinload

# Load author with all books
stmt = select(Author).options(selectinload(Author.books))
result = await session.execute(stmt)
author = result.scalar_one()

# Relationship already loaded
books = author.books  # No await needed!

CRUD Operations

Create

from app.storage.db import async_session

async with async_session() as session:
    async with session.begin():
        author = Author(name="John Doe")
        session.add(author)
        await session.flush()  # Get ID
        await session.refresh(author)
        return author

Read

from sqlmodel import select

# Get by ID
async with async_session() as session:
    author = await session.get(Author, 1)

# Query with filters
async with async_session() as session:
    stmt = select(Author).where(Author.name == "John Doe")
    result = await session.execute(stmt)
    author = result.scalar_one_or_none()

# Get all
async with async_session() as session:
    result = await session.execute(select(Author))
    authors = list(result.scalars().all())

Update

async with async_session() as session:
    async with session.begin():
        author = await session.get(Author, 1)
        if author:
            author.name = "Jane Doe"
            session.add(author)
            await session.flush()
            await session.refresh(author)

Delete

async with async_session() as session:
    async with session.begin():
        author = await session.get(Author, 1)
        if author:
            await session.delete(author)

Repository Pattern

Using Repositories

Encapsulate database logic in repositories:

from app.repositories.author_repository import AuthorRepository

async with async_session() as session:
    repo = AuthorRepository(session)

    # Create
    author = await repo.create(Author(name="John Doe"))

    # Read
    author = await repo.get_by_id(1)
    authors = await repo.get_all()
    john = await repo.get_by_name("John")

    # Update
    author.name = "Jane Doe"
    await repo.update(author)

    # Delete
    await repo.delete(author)

Custom Queries

Add repository methods for complex queries:

class AuthorRepository(BaseRepository[Author]):
    async def search_by_name(self, pattern: str) -> list[Author]:
        """Search authors by name pattern."""
        stmt = select(Author).where(Author.name.ilike(f"%{pattern}%"))
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

Pagination

Using get_paginated_results

from app.storage.db import get_paginated_results

results, meta = await get_paginated_results(
    Author,
    page=1,
    per_page=20,
    filters={"name": "John"}
)

# meta contains: page, per_page, total, pages

Custom Pagination

from sqlmodel import select, func

async def get_paginated_authors(page: int, per_page: int):
    async with async_session() as session:
        # Count total
        count_stmt = select(func.count(Author.id))
        total = (await session.execute(count_stmt)).scalar_one()

        # Get page
        offset = (page - 1) * per_page
        stmt = select(Author).offset(offset).limit(per_page)
        result = await session.execute(stmt)
        items = list(result.scalars().all())

        return items, {
            "page": page,
            "per_page": per_page,
            "total": total,
            "pages": (total + per_page - 1) // per_page
        }

Transactions

Automatic Transactions

Using session.begin():

async with async_session() as session:
    async with session.begin():
        # All operations in one transaction
        author = Author(name="John")
        session.add(author)
        await session.flush()

        book = Book(title="Book", author_id=author.id)
        session.add(book)
        # Commits automatically on exit

Manual Commit/Rollback

async with async_session() as session:
    try:
        author = Author(name="John")
        session.add(author)
        await session.commit()
    except Exception:
        await session.rollback()
        raise

Migrations

Create Migration

make migration msg="Add email to Author"

Apply Migrations

make migrate

Rollback

make rollback

See Database Migrations for full guide.

Error Handling

IntegrityError

from sqlalchemy.exc import IntegrityError

try:
    author = Author(name="John")
    session.add(author)
    await session.commit()
except IntegrityError as e:
    await session.rollback()
    logger.error(f"Constraint violation: {e}")
    raise HTTPException(status_code=400, detail="Duplicate entry")

NoResultFound

from sqlalchemy.exc import NoResultFound

try:
    stmt = select(Author).where(Author.id == 999)
    author = (await session.execute(stmt)).scalar_one()
except NoResultFound:
    raise HTTPException(status_code=404, detail="Author not found")

Best Practices

1. Always Use Sessions as Context Managers

# ✅ Good
async with async_session() as session:
    async with session.begin():
        # operations

# ❌ Bad
session = async_session()
# operations
await session.close()  # Easy to forget!

2. Use Repositories

# ✅ Good - testable, reusable
repo = AuthorRepository(session)
authors = await repo.get_all()

# ❌ Bad - logic in handlers
result = await session.execute(select(Author))
authors = list(result.scalars().all())

3. Eager Load Relationships

# ✅ Good - one query
stmt = select(Author).options(selectinload(Author.books))
authors = (await session.execute(stmt)).scalars().all()

# ❌ Bad - N+1 queries
authors = (await session.execute(select(Author))).scalars().all()
for author in authors:
    books = await author.awaitable_attrs.books  # Separate query!

4. Use Transactions

# ✅ Good - atomic operation
async with session.begin():
    author = Author(name="John")
    session.add(author)
    await session.flush()

    book = Book(author_id=author.id)
    session.add(book)

# ❌ Bad - partial commits possible
author = Author(name="John")
session.add(author)
await session.commit()

book = Book(author_id=author.id)
session.add(book)
await session.commit()  # If this fails, author still created!

Performance

Connection Pooling

Adjust pool size in settings:

POOL_SIZE: int = 10
MAX_OVERFLOW: int = 20

Query Optimization

# Use select() for better performance
stmt = select(Author).where(Author.name == "John")

# Use indexes
class Author(SQLModel, table=True):
    name: str = Field(index=True)  # Indexed!

# Limit results
stmt = select(Author).limit(100)

Testing

Test with In-Memory Database

import pytest
from sqlmodel import create_engine, Session

@pytest.fixture
def session():
    engine = create_engine("sqlite:///:memory:")
    SQLModel.metadata.create_all(engine)
    with Session(engine) as session:
        yield session

Mock Repository

from unittest.mock import AsyncMock

@pytest.fixture
def mock_repo():
    repo = AsyncMock()
    repo.get_by_id.return_value = Author(id=1, name="Test")
    return repo