SQLAlchemy 2.0 in Production - Full Guide
Introduction
This document describes how SQLAlchemy 2.0 operates in production systems. It assumes you understand Python, SQL, relational database design, and concurrency primitives. This is not a tutorial.
The example throughout is a multi-tenant document management SaaS with these entities: Organization, User, Folder, Document, DocumentVersion. We’ll implement tenant isolation, handle schema boundaries, and show where SQLAlchemy helps and where it doesn’t.
Installation and Setup
Core Installation
pip install sqlalchemy==2.0.x # Latest 2.0 release
Database Driver Installation
SQLAlchemy requires a DBAPI driver for each database. Install only what you use in production.
PostgreSQL:
# psycopg2 (stable, widely used, C extension)
pip install psycopg2-binary
# Or build from source (recommended for production)
pip install psycopg2
# psycopg3 / psycopg (modern, async support)
pip install psycopg[binary] # With C speedups
pip install psycopg # Pure Python fallback
# asyncpg (async-only, high performance for asyncio)
pip install asyncpg
MySQL:
# mysqlclient (C extension, fastest, requires libmysqlclient)
pip install mysqlclient
# PyMySQL (pure Python, no dependencies)
pip install pymysql
# mysql-connector-python (official MySQL driver)
pip install mysql-connector-python
SQLite:
# Built into Python standard library, no installation needed
# Just use: engine = create_engine("sqlite:////path/to/database.db")
Async Support
For async applications, install additional dependencies:
# For async PostgreSQL
pip install sqlalchemy[asyncio] # Optional dependencies for async
pip install asyncpg # Required for async PostgreSQL
# For async MySQL (limited support)
pip install aiomysql
Development and Migration Tools
# Alembic for database migrations (required in production)
pip install alembic
# Type hints support
pip install sqlalchemy[mypy]
# Development/testing
pip install pytest pytest-asyncio
Typical Production Setup
# PostgreSQL + Alembic minimal setup
pip install sqlalchemy psycopg[binary] alembic
# Complete multi-database setup
pip install sqlalchemy psycopg[binary] pymysql alembic
Verify Installation
import sqlalchemy
print(f"SQLAlchemy version: {sqlalchemy.__version__}")
# Test engine creation (no connection yet)
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg://user:password@localhost/test", echo=False)
print(f"Engine created: {engine}")
Dependency Summary
| Component | For | Installation |
|---|---|---|
| SQLAlchemy | ORM/Core | pip install sqlalchemy |
| psycopg | PostgreSQL sync | pip install psycopg[binary] |
| psycopg2 | PostgreSQL sync (legacy) | pip install psycopg2-binary |
| asyncpg | PostgreSQL async | pip install asyncpg |
| mysqlclient | MySQL (fast) | pip install mysqlclient |
| PyMySQL | MySQL (pure Python) | pip install pymysql |
| Alembic | Database migrations | pip install alembic |
1. SQLAlchemy Architecture
Core vs ORM
Core provides:
- SQL expression construction
- Connection pooling
- Transaction management
- Type system
- Schema introspection
ORM adds:
- Object-relational mapping
- Identity map (per-session object cache)
- Unit of work (automatic INSERT/UPDATE/DELETE batching)
- Lazy/eager loading strategies
- Relationship traversal
Capability boundaries:
- Core cannot track object state or relationships.
- ORM cannot express every SQL construct (window functions with complex partitioning, recursive CTEs with multiple anchors).
- Mixing both is normal and recommended.
Engine, Connection, Session
Engine:
- Owns connection pool
- Thread-safe
- Create once per database, reuse globally
- Does not hold connections open
from sqlalchemy import create_engine
# Create once at module level
engine = create_engine(
"postgresql+psycopg2://user:pass@host/dbname",
pool_size=10, # Max persistent connections in pool
max_overflow=20, # Additional connections beyond pool_size when needed, dropped after usage (Use carefully).
pool_pre_ping=True, # Test connection validity before using (prevents stale connections)
echo=False, # Don't log SQL statements (set True for debugging)
)
Connection:
- Represents a single DBAPI connection
- Not thread-safe
- Acquired from engine pool
- Must be explicitly closed or used as context manager
# Explicit connection management
with engine.connect() as conn: # Checkout connection from pool
result = conn.execute(text("SELECT 1"))
conn.commit() # Explicit commit required in SQLAlchemy 2.0
# Connection automatically returned to pool on exit
Session:
- ORM’s unit of work
- Wraps a connection
- Not thread-safe
- Tracks object state (identity map)
- Batches changes, flushes on commit or explicit flush()
from sqlalchemy.orm import sessionmaker, Session
SessionLocal = sessionmaker(
bind=engine, # Engine to use for connections
expire_on_commit=False # Don't expire objects after commit (optional)
)
# Per-request or per-task
def process_request():
session = SessionLocal() # Create new session instance
try:
# work
session.commit() # Flush changes and commit transaction
except:
session.rollback() # Undo changes on error
raise
finally:
session.close() # Release connection back to pool
Thread safety:
- Engine: safe to share
- Connection: never share across threads
- Session: never share across threads or async tasks
Identity Map and Unit of Work
Identity map:
- Session-local cache keyed by
(class, primary_key) - Same database row = same Python object within a session
- Prevents duplicate SELECT queries for already-loaded objects
- Cleared on session.close() or session.expunge_all()
session = SessionLocal()
user1 = session.get(User, 123) # Fetch from database, store in identity map
user2 = session.get(User, 123) # Return cached object from identity map (no SQL)
assert user1 is user2 # True - same object instance
Unit of work:
- Session tracks: new, dirty, deleted
- Flush batches all changes into SQL
- Commit flushes then commits transaction
- Rollback clears pending changes and reverts transaction
When identity map causes issues:
- Long-lived sessions see stale data
- High-concurrency writes cause serialization conflicts
- Memory bloat if loading thousands of objects
Solution: Short-lived sessions, explicit expunge, or use Core for bulk reads.
2. Database and Schema Setup
Creating Database Programmatically
SQLAlchemy does not create databases. You must do this externally or via raw SQL.
from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError
def create_database_if_not_exists(admin_url: str, db_name: str):
"""
admin_url: connection to 'postgres' or 'mysql' system database
db_name: target database name
"""
engine = create_engine(
admin_url,
isolation_level="AUTOCOMMIT" # Required for CREATE DATABASE (can't run in transaction)
)
with engine.connect() as conn:
# Check existence
result = conn.execute(
text("SELECT 1 FROM pg_database WHERE datname = :db"),
{"db": db_name} # Safe parameter binding
)
if not result.fetchone(): # Database doesn't exist
# PostgreSQL does not allow parameterized CREATE DATABASE
conn.execute(text(f"CREATE DATABASE {db_name}")) # Must use string interpolation
engine.dispose() # Close all connections
# Usage
create_database_if_not_exists(
"postgresql+psycopg2://admin:pass@localhost/postgres",
"docmanager"
)
For MySQL:
def create_mysql_database(admin_url: str, db_name: str):
engine = create_engine(admin_url)
with engine.connect() as conn:
conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {db_name}"))
conn.commit()
engine.dispose()
Dedicated Schemas
PostgreSQL supports schemas (namespaces within a database). MySQL does not (database = schema).
from sqlalchemy import MetaData, Table, Column, Integer, String
# Create schemas
def setup_schemas(engine):
with engine.connect() as conn:
conn.execute(text("CREATE SCHEMA IF NOT EXISTS app"))
conn.execute(text("CREATE SCHEMA IF NOT EXISTS auth"))
conn.commit()
# Map models to schema
metadata_app = MetaData(schema="app")
metadata_auth = MetaData(schema="auth")
class User(Base):
__tablename__ = "users"
__table_args__ = {"schema": "auth"}
id = Column(Integer, primary_key=True)
email = Column(String(255), nullable=False, unique=True)
class Document(Base):
__tablename__ = "documents"
__table_args__ = {"schema": "app"}
id = Column(Integer, primary_key=True)
title = Column(String(500), nullable=False)
Foreign keys across schemas:
from sqlalchemy import ForeignKey
class Document(Base):
__tablename__ = "documents"
__table_args__ = {"schema": "app"}
id = Column(Integer, primary_key=True)
owner_id = Column(Integer, ForeignKey("auth.users.id"), nullable=False)
Search path:
Default search path is "$user", public. If you use a custom schema, either:
- Fully qualify all table names (
schema.table) - Set search path per connection:
from sqlalchemy import event
@event.listens_for(engine, "connect")
def set_search_path(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
cursor.execute("SET search_path TO app, auth, public")
cursor.close()
3. Engine Configuration
Connection URLs
SQLite:
# In-memory
engine = create_engine("sqlite:///:memory:")
# File-based
engine = create_engine("sqlite:///./dev.db")
# Enable foreign keys (disabled by default)
from sqlalchemy import event
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
PostgreSQL:
# psycopg2 (sync)
engine = create_engine(
"postgresql+psycopg2://user:password@host:5432/database"
)
# psycopg (async)
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:password@host:5432/database"
)
MySQL:
# mysqlclient
engine = create_engine(
"mysql+mysqldb://user:password@host:3306/database?charset=utf8mb4"
)
# PyMySQL
engine = create_engine(
"mysql+pymysql://user:password@host:3306/database?charset=utf8mb4"
)
Pooling Configuration
engine = create_engine(
url,
pool_size=10, # Core pool of persistent connections
max_overflow=20, # Extra connections allowed when pool exhausted (total max = 30)
pool_recycle=3600, # Close and replace connections after 1 hour
pool_pre_ping=True, # Test connection with SELECT 1 before use (adds 1-2ms)
pool_timeout=30, # Seconds to wait for connection before raising TimeoutError
)
pool_pre_ping:
- Adds 1-2ms per checkout
- Prevents errors when database closes idle connections
- Required if
wait_timeout<pool_recycle
pool_recycle:
- Must be less than database’s connection timeout
- PostgreSQL: default unlimited, but cloud providers enforce limits
- MySQL: default 8 hours (
wait_timeout)
pool_size + max_overflow:
- Total max connections = pool_size + max_overflow
- Set based on
max_connectionson database server - Leave headroom for admin connections and other services
Echo and Isolation Levels
# Log all SQL (dev only)
engine = create_engine(url, echo=True)
# Set isolation level
engine = create_engine(
url,
isolation_level="REPEATABLE READ" # or READ COMMITTED, SERIALIZABLE
)
# Per-connection isolation
with engine.connect().execution_options(
isolation_level="SERIALIZABLE"
) as conn:
# work
pass
Autocommit:
SQLAlchemy 2.0 removed autocommit mode. All operations are transactional by default.
# Explicit commit required
with engine.connect() as conn:
conn.execute(text("INSERT INTO logs (msg) VALUES ('event')"))
conn.commit()
Multiple Engines (Read/Write Split)
write_engine = create_engine("postgresql://primary/db")
read_engine = create_engine("postgresql://replica/db")
# Route queries explicitly
def get_user_readonly(user_id: int):
with Session(bind=read_engine) as session:
return session.get(User, user_id)
def update_user(user_id: int, email: str):
with Session(bind=write_engine) as session:
user = session.get(User, user_id)
user.email = email
session.commit()
Routing via session bind:
from sqlalchemy.orm import Session
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
if self._flushing:
return write_engine
else:
return read_engine
Limitations:
- ORM relationships span binds awkwardly
- Replication lag causes read-after-write inconsistencies
- Better handled at infrastructure level (HAProxy, PgBouncer)
4. Declarative ORM (Production Patterns)
Declarative Base and Naming Conventions
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import MetaData, Integer, String, Text, DateTime, ForeignKey
from datetime import datetime
# Naming convention for constraints (critical for Alembic)
convention = {
"ix": "ix_%(column_0_label)s", # Index names
"uq": "uq_%(table_name)s_%(column_0_name)s", # Unique constraint names
"ck": "ck_%(table_name)s_%(constraint_name)s", # Check constraint names
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", # Foreign key names
"pk": "pk_%(table_name)s" # Primary key names
}
class Base(DeclarativeBase):
metadata = MetaData(naming_convention=convention) # Apply conventions to all models
Models with Explicit Configuration
from typing import Optional
from sqlalchemy.orm import relationship
class Organization(Base):
__tablename__ = "organizations" # Explicit table name
__table_args__ = {"schema": "app"} # PostgreSQL schema (namespace)
id: Mapped[int] = mapped_column(primary_key=True) # Auto-increment by default
name: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), # Store timezone-aware timestamps
nullable=False,
default=datetime.utcnow # Set at insert time (Python-side default)
)
# Relationships (not columns in database, Python-only)
users: Mapped[list["User"]] = relationship(back_populates="organization")
folders: Mapped[list["Folder"]] = relationship(back_populates="organization")
class User(Base):
__tablename__ = "users"
__table_args__ = (
{"schema": "auth"} # Different schema from Organization
)
id: Mapped[int] = mapped_column(primary_key=True)
organization_id: Mapped[int] = mapped_column(
ForeignKey("app.organizations.id", ondelete="CASCADE"), # FK across schemas
nullable=False,
index=True # Index foreign keys for join performance
)
email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
full_name: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
# Relationships enable traversal: user.organization, user.documents
organization: Mapped["Organization"] = relationship(back_populates="users")
documents: Mapped[list["Document"]] = relationship(back_populates="owner")
class Folder(Base):
__tablename__ = "folders"
__table_args__ = (
{"schema": "app"}
)
id: Mapped[int] = mapped_column(primary_key=True)
organization_id: Mapped[int] = mapped_column(
ForeignKey("app.organizations.id", ondelete="CASCADE"), # Delete cascade from org
nullable=False,
index=True
)
parent_id: Mapped[Optional[int]] = mapped_column(
ForeignKey("app.folders.id", ondelete="CASCADE"), # Self-referential FK
nullable=True, # NULL for root folders
index=True
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
organization: Mapped["Organization"] = relationship(back_populates="folders")
parent: Mapped[Optional["Folder"]] = relationship(
remote_side=[id], # Specify "one" side of one-to-many (which column is remote)
back_populates="children"
)
children: Mapped[list["Folder"]] = relationship(back_populates="parent") # One-to-many
documents: Mapped[list["Document"]] = relationship(back_populates="folder")
class Document(Base):
__tablename__ = "documents"
__table_args__ = (
{"schema": "app"}
)
id: Mapped[int] = mapped_column(primary_key=True)
organization_id: Mapped[int] = mapped_column(
ForeignKey("app.organizations.id", ondelete="CASCADE"), # Tenant isolation column
nullable=False,
index=True # Critical for tenant-filtered queries
)
folder_id: Mapped[int] = mapped_column(
ForeignKey("app.folders.id", ondelete="CASCADE"),
nullable=False,
index=True
)
owner_id: Mapped[int] = mapped_column(
ForeignKey("auth.users.id", ondelete="RESTRICT"), # Prevent deletion if user owns docs
nullable=False,
index=True
)
title: Mapped[str] = mapped_column(String(500), nullable=False)
content: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # Large text field
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
onupdate=datetime.utcnow # Auto-update timestamp on UPDATE (Python-side)
)
organization: Mapped["Organization"] = relationship()
folder: Mapped["Folder"] = relationship(back_populates="documents")
owner: Mapped["User"] = relationship(back_populates="documents")
versions: Mapped[list["DocumentVersion"]] = relationship(
back_populates="document",
order_by="DocumentVersion.version_number.desc()"
)
class DocumentVersion(Base):
__tablename__ = "document_versions"
__table_args__ = (
{"schema": "app"}
)
id: Mapped[int] = mapped_column(primary_key=True)
document_id: Mapped[int] = mapped_column(
ForeignKey("app.documents.id", ondelete="CASCADE"),
nullable=False,
index=True
)
version_number: Mapped[int] = mapped_column(Integer, nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
document: Mapped["Document"] = relationship(back_populates="versions")
Indexes and Unique Constraints
from sqlalchemy import Index, UniqueConstraint
class Document(Base):
__tablename__ = "documents"
__table_args__ = (
# Composite index for queries like: WHERE org_id=X AND folder_id=Y
Index("ix_documents_org_folder", "organization_id", "folder_id"),
# Prevent duplicate version numbers within same document
UniqueConstraint("document_id", "version_number", name="uq_document_version"),
# Partial index (PostgreSQL only) - smaller, faster for filtered queries
Index(
"ix_documents_active_title",
"title",
postgresql_where=text("deleted_at IS NULL") # Only index non-deleted docs
),
{"schema": "app"}
)
Why explicit naming:
- Alembic generates consistent migration diffs
- Database error messages reference known constraint names
- Avoids auto-generated names like
documents_organization_id_folder_id_key_1
5. Table Creation and Lifecycle
metadata.create_all()
# Create all tables
Base.metadata.create_all(bind=engine)
# Create specific tables
Base.metadata.create_all(bind=engine, tables=[User.__table__, Document.__table__])
# Drop all tables
Base.metadata.drop_all(bind=engine)
When acceptable:
- Local development setup
- Ephemeral test databases in CI
- Initial schema bootstrap in greenfield projects
When forbidden:
- Production deployments
- Any environment with existing data
- Schema changes (use migrations)
Controlled Table Creation in CI
# tests/conftest.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@pytest.fixture(scope="session")
def engine():
engine = create_engine("postgresql://test:test@localhost/test_db")
Base.metadata.create_all(bind=engine)
yield engine
Base.metadata.drop_all(bind=engine)
engine.dispose()
@pytest.fixture
def session(engine):
connection = engine.connect() # Get connection
transaction = connection.begin() # Start transaction
session = sessionmaker(bind=connection)() # Create session using this connection
yield session # Test runs here
session.close() # Close session
transaction.rollback() # Rollback all test changes
connection.close() # Return connection to pool
Why Migrations Are Mandatory in Production
Problems with create_all():
- No versioning or rollback
- Cannot handle data migrations
- Drops and recreates indexes unnecessarily
- Loses data on schema changes
- No audit trail
Use Alembic or equivalent. See section 10.
6. Sessions and Transactions
Session Scope
Per-request (web applications):
from contextlib import contextmanager
@contextmanager
def get_session():
session = SessionLocal()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
# FastAPI example
from fastapi import Depends
def get_db():
db = SessionLocal() # Create session for this request
try:
yield db # Inject into route handler
finally:
db.close() # Always close after request
@app.post("/documents")
def create_document(doc: DocCreate, db: Session = Depends(get_db)):
document = Document(**doc.dict()) # Create model instance
db.add(document) # Stage for insert
db.commit() # Execute INSERT, commit transaction
db.refresh(document) # Reload from DB (get auto-generated ID, defaults)
return document
Per-job (background workers):
def process_document_job(document_id: int):
with get_session() as session:
document = session.get(Document, document_id)
# process
session.commit()
Unit of work (manual control):
def transfer_documents(source_folder_id: int, target_folder_id: int):
session = SessionLocal()
try:
documents = session.query(Document).filter_by(folder_id=source_folder_id).all()
for doc in documents:
doc.folder_id = target_folder_id
session.commit()
except:
session.rollback()
raise
finally:
session.close()
Explicit Transaction Boundaries
from sqlalchemy.orm import Session
def update_document_with_version(session: Session, doc_id: int, new_content: str):
"""
Transaction boundary defined by caller.
"""
document = session.get(Document, doc_id)
if not document:
raise ValueError("Document not found")
# Create new version
latest = session.query(DocumentVersion).filter_by(
document_id=doc_id
).order_by(DocumentVersion.version_number.desc()).first()
next_version = (latest.version_number + 1) if latest else 1
version = DocumentVersion(
document_id=doc_id,
version_number=next_version,
content=new_content,
created_at=datetime.utcnow()
)
session.add(version)
document.content = new_content
document.updated_at = datetime.utcnow()
# Caller commits or rolls back
# Usage
with get_session() as session:
update_document_with_version(session, 123, "new content")
# implicit commit on exit
Nested Transactions (Savepoints)
from sqlalchemy.orm import Session
def create_folder_with_documents(session: Session, org_id: int, folder_name: str, doc_titles: list[str]):
# Outer transaction started by caller
folder = Folder(organization_id=org_id, name=folder_name)
session.add(folder) # Stage folder for insert
session.flush() # Execute INSERT immediately, get folder.id without committing
for title in doc_titles:
try:
# Nested transaction (SAVEPOINT in SQL)
with session.begin_nested(): # Creates SAVEPOINT
doc = Document(
organization_id=org_id,
folder_id=folder.id, # Now available from flush
owner_id=1, # placeholder
title=title,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
session.add(doc)
except Exception as e:
# ROLLBACK TO SAVEPOINT - folder insert still valid
print(f"Failed to create document {title}: {e}")
continue
# Outer transaction commits all successful documents
# Usage
with get_session() as session:
create_folder_with_documents(session, 1, "Reports", ["Q1", "Q2", "invalid!!!"])
Rollback Semantics
session = SessionLocal()
user = session.get(User, 1)
user.email = "[email protected]"
session.rollback() # Changes discarded
print(user.email) # Old value, object reverted
Object state after rollback:
- Attributes reset to last committed state
- Pending INSERTs removed from session
- Objects remain in session (identity map)
Full reset:
session.rollback()
session.expunge_all() # Clear identity map
Patterns That Prevent Session Leaks
Incorrect:
# Session never closed if exception raised
session = SessionLocal()
user = session.get(User, 1)
user.email = "[email protected]"
session.commit()
session.close() # Not reached if commit() fails
Correct:
session = SessionLocal()
try:
user = session.get(User, 1)
user.email = "[email protected]"
session.commit()
finally:
session.close()
Best:
with SessionLocal() as session:
user = session.get(User, 1)
user.email = "[email protected]"
session.commit()
# Auto-closed on exit
Async:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async_session = async_sessionmaker(async_engine, expire_on_commit=False)
async with async_session() as session:
async with session.begin():
user = await session.get(User, 1)
user.email = "[email protected]"
# Auto-commit and close
7. Querying (ORM)
Deterministic Querying Patterns
SQLAlchemy 2.0 uses select() construct for all ORM queries.
from sqlalchemy import select, and_, or_, func
# Get by primary key (checks identity map first, then SELECT)
session.get(User, 123)
# Simple filter
stmt = select(User).where(User.email == "[email protected]") # Build SELECT statement
user = session.execute(stmt).scalar_one_or_none() # Execute, return single object or None
# Multiple conditions (AND)
stmt = select(Document).where(
and_( # Explicit AND (can also chain .where() calls)
Document.organization_id == org_id,
Document.folder_id == folder_id,
Document.created_at >= start_date
)
)
documents = session.execute(stmt).scalars().all() # scalars() unwraps tuples, .all() fetches list
# OR conditions
stmt = select(User).where(
or_(
User.email == email,
User.full_name.ilike(f"%{query}%")
)
)
users = session.execute(stmt).scalars().all()
# Ordering and limiting
stmt = select(Document).where(
Document.organization_id == org_id
).order_by(Document.created_at.desc()).limit(10)
recent_docs = session.execute(stmt).scalars().all()
# Aggregation
stmt = select(func.count(Document.id)).where(Document.organization_id == org_id)
count = session.execute(stmt).scalar()
Joins and Subqueries
# Explicit join
stmt = select(Document, User).join(User, Document.owner_id == User.id).where(
Document.organization_id == org_id
)
results = session.execute(stmt).all()
for doc, user in results:
print(doc.title, user.full_name)
# Join via relationship
stmt = select(Document).join(Document.owner).where(User.email.like("%@example.com"))
docs = session.execute(stmt).scalars().all()
# Subquery for counts
from sqlalchemy import select, func
subq = select(
Document.folder_id,
func.count(Document.id).label("doc_count")
).where(
Document.organization_id == org_id
).group_by(Document.folder_id).subquery()
stmt = select(Folder, subq.c.doc_count).outerjoin(
subq, Folder.id == subq.c.folder_id
).where(Folder.organization_id == org_id)
results = session.execute(stmt).all()
for folder, count in results:
print(folder.name, count or 0)
Relationship Loading Strategies
Lazy loading (default):
document = session.get(Document, 1) # SELECT documents WHERE id=1
print(document.owner.email) # Triggers separate SELECT users WHERE id=document.owner_id
Eager loading (joinedload):
from sqlalchemy.orm import joinedload
stmt = select(Document).options(
joinedload(Document.owner) # Use LEFT OUTER JOIN to load owner in same query
).where(Document.id == 1)
document = session.execute(stmt).scalar_one()
print(document.owner.email) # Already loaded, no additional SELECT
Subquery load:
from sqlalchemy.orm import subqueryload
stmt = select(Folder).options(
subqueryload(Folder.documents).subqueryload(Document.versions)
).where(Folder.organization_id == org_id)
folders = session.execute(stmt).scalars().all()
for folder in folders:
for doc in folder.documents:
print(doc.versions) # Already loaded
Select in load:
from sqlalchemy.orm import selectinload
stmt = select(Organization).options(
selectinload(Organization.folders) # Separate SELECT for all folders (WHERE org_id IN (...))
.selectinload(Folder.documents) # Then SELECT for all documents (WHERE folder_id IN (...))
).where(Organization.id == 1)
org = session.execute(stmt).scalar_one() # 3 queries total, regardless of collection size
for folder in org.folders:
for doc in folder.documents:
print(doc.title) # No N+1, all data pre-loaded
Avoiding N+1 Issues
Problem:
folders = session.execute(select(Folder).where(Folder.organization_id == org_id)).scalars().all()
for folder in folders:
print(folder.documents) # N queries
Solution 1 - selectinload:
stmt = select(Folder).options(selectinload(Folder.documents)).where(
Folder.organization_id == org_id
)
folders = session.execute(stmt).scalars().all()
for folder in folders:
print(folder.documents) # 2 queries total
Solution 2 - manual join:
stmt = select(Folder, Document).outerjoin(Document).where(Folder.organization_id == org_id)
results = session.execute(stmt).all()
folders_dict = {}
for folder, doc in results:
if folder.id not in folders_dict:
folders_dict[folder.id] = {"folder": folder, "documents": []}
if doc:
folders_dict[folder.id]["documents"].append(doc)
Bulk Operations and Caveats
Bulk insert (bypasses unit of work):
session.bulk_insert_mappings(
Document, # Model class
[ # List of dictionaries (not model instances)
{"organization_id": 1, "folder_id": 1, "owner_id": 1, "title": f"Doc {i}",
"created_at": datetime.utcnow(), "updated_at": datetime.utcnow()}
for i in range(1000)
]
)
session.commit() # Executes bulk INSERT (much faster than 1000 add() calls)
Bulk update:
session.bulk_update_mappings(
Document,
[
{"id": 1, "title": "Updated 1"},
{"id": 2, "title": "Updated 2"},
]
)
session.commit()
Caveats:
- No validation or default values
- No relationship handling
- No ORM events fired
- Objects not added to session
- Faster but less safe
Batch update (ORM-aware):
from sqlalchemy import update
stmt = update(Document).where(
Document.folder_id == old_folder_id # Filter which rows to update
).values(folder_id=new_folder_id) # Set new value
session.execute(stmt) # Executes: UPDATE documents SET folder_id=X WHERE folder_id=Y
session.commit() # Commits transaction
When ORM Queries Become a Liability
Complex aggregations:
# ORM: awkward and slow
from sqlalchemy import func, case
stmt = select(
Folder.name,
func.count(Document.id).label("total_docs"),
func.sum(case((Document.content.isnot(None), 1), else_=0)).label("with_content")
).join(Document).group_by(Folder.id, Folder.name).where(
Folder.organization_id == org_id
)
# Better: raw SQL (see section 8)
Large scans:
# ORM: loads all rows into memory
stmt = select(Document).where(Document.organization_id == org_id)
documents = session.execute(stmt).scalars().all() # 100k objects
# Better: Core with streaming or pagination
Use Core or raw SQL when:
- Aggregating thousands of rows
- Returning denormalized reports
- Write-heavy operations with no read-back
- Query complexity exceeds ORM expressiveness
8. Raw SQL Execution
Executing Raw SQL via text()
from sqlalchemy import text
# Simple query
result = session.execute(text("SELECT id, title FROM app.documents WHERE organization_id = 1"))
for row in result:
print(row.id, row.title)
# With connection (no session)
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM app.documents"))
count = result.scalar()
conn.commit() # Required even for SELECT if in transaction
Binding Parameters Safely
Never use string formatting:
# WRONG - SQL injection vulnerability
org_id = request.args.get("org_id") # User input could be "1 OR 1=1"
stmt = text(f"SELECT * FROM app.documents WHERE organization_id = {org_id}")
Correct - bound parameters:
stmt = text("SELECT * FROM app.documents WHERE organization_id = :org_id") # Named parameter
result = session.execute(stmt, {"org_id": org_id}) # Safely bound, SQL injection protected
Multiple parameters:
stmt = text("""
SELECT * FROM app.documents
WHERE organization_id = :org_id
AND created_at >= :start_date
AND title ILIKE :title_pattern
""")
result = session.execute(stmt, {
"org_id": org_id,
"start_date": start_date,
"title_pattern": f"%{query}%"
})
Returning Scalar vs Row Mappings
Fetch all rows:
result = session.execute(text("SELECT id, title FROM app.documents"))
rows = result.fetchall() # List of Row objects
for row in rows:
print(row.id, row.title) # Access by column name or index: row[0], row['id']
Fetch single row:
result = session.execute(text("SELECT id, title FROM app.documents WHERE id = :id"), {"id": 1})
row = result.fetchone() # Single Row object or None
if row:
print(row.id, row.title)
Scalar (single value):
count = session.execute(text("SELECT COUNT(*) FROM app.documents")).scalar() # Return first column of first row
Map to dictionary:
result = session.execute(text("SELECT id, title, created_at FROM app.documents"))
docs = [{"id": row.id, "title": row.title, "created_at": row.created_at} for row in result]
Mixing ORM-Managed Entities with Raw SQL
Load ORM objects from raw SQL:
stmt = text("SELECT * FROM app.documents WHERE organization_id = :org_id")
result = session.execute(stmt, {"org_id": org_id})
# Cannot directly map to ORM objects from text()
# Must use ORM query or load manually
# Alternative: use select() with literal SQL
from sqlalchemy import literal_column
stmt = select(Document).where(
literal_column("organization_id") == org_id
)
documents = session.execute(stmt).scalars().all()
Execute raw SQL, then refresh ORM objects:
session.execute(text("UPDATE app.documents SET title = :title WHERE id = :id"), {
"id": doc_id,
"title": new_title
})
session.commit()
# Refresh ORM object
document = session.get(Document, doc_id)
session.refresh(document)
print(document.title) # Reflects database state
Using Raw SQL for Performance-Critical Paths
Example: Bulk reporting query
def get_organization_stats(org_id: int) -> dict:
query = text("""
SELECT
(SELECT COUNT(*) FROM app.documents WHERE organization_id = :org_id) as total_docs,
(SELECT COUNT(*) FROM app.folders WHERE organization_id = :org_id) as total_folders,
(SELECT COUNT(*) FROM auth.users WHERE organization_id = :org_id) as total_users,
(SELECT MAX(created_at) FROM app.documents WHERE organization_id = :org_id) as latest_doc_date
""")
with engine.connect() as conn:
result = conn.execute(query, {"org_id": org_id})
row = result.fetchone()
return {
"total_docs": row.total_docs,
"total_folders": row.total_folders,
"total_users": row.total_users,
"latest_doc_date": row.latest_doc_date
}
Example: Complex join with window functions
def get_latest_document_per_folder(org_id: int):
query = text("""
WITH ranked_docs AS (
SELECT
d.id, d.folder_id, d.title, d.created_at,
ROW_NUMBER() OVER (PARTITION BY d.folder_id ORDER BY d.created_at DESC) as rn
FROM app.documents d
WHERE d.organization_id = :org_id
)
SELECT rd.folder_id, rd.title, rd.created_at, f.name as folder_name
FROM ranked_docs rd
JOIN app.folders f ON rd.folder_id = f.id
WHERE rd.rn = 1
""")
result = session.execute(query, {"org_id": org_id})
return [
{
"folder_id": row.folder_id,
"folder_name": row.folder_name,
"document_title": row.title,
"created_at": row.created_at
}
for row in result
]
9. Multi-Tenancy Strategies
A. Shared Database, Shared Schema
All tenants share the same tables. Tenant isolation enforced by organization_id column.
Advantages:
- Simple schema management
- Easy to query across tenants (admin/analytics)
- Single migration path
Disadvantages:
- Risk of data leakage if query filters are missed
- Cannot easily set per-tenant resource limits
- Index bloat if tenant sizes vary significantly
Implementation:
# Every query must filter by organization_id
def get_documents(session: Session, org_id: int):
stmt = select(Document).where(Document.organization_id == org_id)
return session.execute(stmt).scalars().all()
# Incorrect - missing tenant filter
def get_document_by_id(session: Session, doc_id: int):
return session.get(Document, doc_id) # DANGER: No tenant check
# Correct
def get_document_by_id(session: Session, org_id: int, doc_id: int):
stmt = select(Document).where(
and_(Document.id == doc_id, Document.organization_id == org_id)
)
return session.execute(stmt).scalar_one_or_none()
Session-level filters (automatic tenant scoping):
from sqlalchemy import event
from sqlalchemy.orm import Session
class TenantSession(Session):
def __init__(self, organization_id: int, **kwargs):
super().__init__(**kwargs)
self.organization_id = organization_id
def execute(self, statement, *args, **kwargs):
# Intercept and inject tenant filter (complex, brittle)
# Not recommended for production
return super().execute(statement, *args, **kwargs)
# Better: Use explicit helper
def with_tenant_filter(stmt, org_id: int, model):
return stmt.where(model.organization_id == org_id)
# Usage
stmt = select(Document)
stmt = with_tenant_filter(stmt, org_id, Document)
documents = session.execute(stmt).scalars().all()
Query helper pattern:
from typing import Type, TypeVar
from sqlalchemy import Select
T = TypeVar("T")
def tenant_query(session: Session, model: Type[T], org_id: int) -> Select[tuple[T]]:
"""Returns a select statement pre-filtered by organization_id."""
return select(model).where(model.organization_id == org_id) # Always inject tenant filter
# Usage
def get_all_documents(session: Session, org_id: int):
stmt = tenant_query(session, Document, org_id) # Get pre-filtered statement
return session.execute(stmt).scalars().all() # Execute: SELECT * FROM documents WHERE org_id=X
def get_folder_with_documents(session: Session, org_id: int, folder_id: int):
stmt = tenant_query(session, Folder, org_id).where(
Folder.id == folder_id
).options(selectinload(Folder.documents))
return session.execute(stmt).scalar_one_or_none()
Risks and failure modes:
- Forgotten filter:
# Developer forgets to add org_id filter
stmt = select(Document).where(Document.title.ilike(f"%{query}%"))
documents = session.execute(stmt).scalars().all() # Returns all tenants' docs
Mitigation:
- Code review checklist
- Automated linting (detect queries without tenant filter)
- Integration tests with multiple tenants
- Joins across tenants:
# Incorrect: joins user from different org
stmt = select(Document).join(User).where(Document.id == doc_id)
doc = session.execute(stmt).scalar_one() # User might be from different org
# Correct: verify both sides
stmt = select(Document).join(User).where(
and_(
Document.id == doc_id,
Document.organization_id == org_id,
User.organization_id == org_id
)
)
B. Shared Database, Separate Schemas
Each tenant gets a dedicated PostgreSQL schema. Tables have identical structure across schemas.
Advantages:
- Strong isolation (no query can accidentally cross tenants)
- Tenant-level backups/restores
- Per-tenant indexing strategies
Disadvantages:
- Complex migration management (must apply to all schemas)
- Cross-tenant queries require explicit UNION or schema-qualified joins
- Schema count limits (PostgreSQL handles thousands, but management overhead)
Implementation:
1. Dynamic schema binding per request:
from sqlalchemy import event, text
from sqlalchemy.orm import Session
# Store tenant schema in session.info (session-local storage)
class TenantSession(Session):
def __init__(self, tenant_schema: str, **kwargs):
super().__init__(**kwargs)
self.info["tenant_schema"] = tenant_schema # Store schema name for this session
# Set search_path on connection checkout (executed when transaction begins)
@event.listens_for(SessionLocal, "after_begin")
def set_tenant_schema(session, transaction, connection):
tenant_schema = session.info.get("tenant_schema") # Retrieve stored schema
if tenant_schema:
connection.execute(text(f"SET search_path TO {tenant_schema}, public")) # Set PostgreSQL search path
# Usage
def get_tenant_session(org_id: int) -> Session:
tenant_schema = f"tenant_{org_id}" # e.g., tenant_1, tenant_2
return TenantSession(tenant_schema=tenant_schema, bind=engine)
# Request handler
def handle_request(org_id: int):
session = get_tenant_session(org_id)
try:
# All queries automatically use tenant schema
documents = session.execute(select(Document)).scalars().all()
session.commit()
finally:
session.close()
2. Creating tenant schemas programmatically:
def create_tenant_schema(org_id: int):
schema_name = f"tenant_{org_id}" # e.g., tenant_42
with engine.connect() as conn:
# Create PostgreSQL schema (namespace)
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.commit()
# Create tables in tenant schema
tenant_metadata = MetaData(schema=schema_name) # Target this schema
# Redefine tables with tenant schema (must mirror production models)
Table(
"documents", tenant_metadata,
Column("id", Integer, primary_key=True),
Column("title", String(500), nullable=False),
# ... all columns (must match exactly)
)
Table("folders", tenant_metadata, ...)
# etc.
tenant_metadata.create_all(bind=engine) # CREATE TABLE tenant_42.documents ...
# Bootstrap new tenant
def onboard_organization(org_id: int, name: str):
# Create schema and tables
create_tenant_schema(org_id)
# Insert initial data
session = get_tenant_session(org_id)
try:
root_folder = Folder(name="Root", parent_id=None)
session.add(root_folder)
session.commit()
finally:
session.close()
3. Migration complexity:
Each tenant schema must be migrated independently.
# Alembic migration runner
def migrate_all_tenants():
org_ids = get_all_organization_ids()
for org_id in org_ids:
schema_name = f"tenant_{org_id}"
print(f"Migrating {schema_name}...")
# Run Alembic with target schema
# Requires Alembic config to dynamically set schema
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", str(engine.url))
alembic_cfg.set_section_option("alembic", "schema", schema_name)
command.upgrade(alembic_cfg, "head")
4. Cross-tenant queries (admin/reporting):
def get_document_count_all_tenants():
org_ids = get_all_organization_ids()
results = []
for org_id in org_ids:
session = get_tenant_session(org_id)
try:
count = session.execute(
select(func.count(Document.id))
).scalar()
results.append({"org_id": org_id, "doc_count": count})
finally:
session.close()
return results
# Alternatively: raw SQL with schema-qualified names
def get_document_count_all_tenants_raw():
org_ids = get_all_organization_ids()
union_parts = [
f"SELECT {org_id} as org_id, COUNT(*) as doc_count FROM tenant_{org_id}.documents"
for org_id in org_ids
]
query = " UNION ALL ".join(union_parts)
with engine.connect() as conn:
result = conn.execute(text(query))
return [{"org_id": row.org_id, "doc_count": row.doc_count} for row in result]
Risks and failure modes:
- Schema not set correctly:
session = SessionLocal() # Forgot to use get_tenant_session
documents = session.execute(select(Document)).scalars().all() # Uses public or default schema
Mitigation:
- Always use factory function (get_tenant_session)
- Middleware that sets schema automatically
- Migration failures:
# Migration fails for tenant_42 due to unique constraint violation
# Rollback strategy needed for partial failures
Mitigation:
- Test migrations on copy of production data
- Idempotent migrations where possible
- Monitor migration logs per tenant
Optional: Database-per-Tenant
Each tenant has a separate database. Maximum isolation.
Tradeoffs:
- Connection pool per database (resource overhead)
- Extremely complex cross-tenant queries
- Simple migration per tenant (run Alembic once per database)
- Highest security/compliance level
Not covered in detail. Use only if regulatory requirements demand physical isolation.
10. Migrations (Alembic - High Level)
Why SQLAlchemy Alone Is Insufficient
metadata.create_all() cannot:
- Modify existing tables (ADD/DROP/ALTER COLUMN)
- Track which migrations have been applied
- Rollback changes
- Handle data transformations during schema changes
Schema Naming and Migration Safety
Alembic generates migration scripts based on model metadata. Explicit constraint names prevent conflicts.
# alembic/env.py
from myapp.models import Base
target_metadata = Base.metadata
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
# Include schemas
include_schemas=True,
version_table_schema="public" # Store alembic_version in public schema
)
with context.begin_transaction():
context.run_migrations()
Multi-Tenant Migration Considerations
Shared schema approach:
- Single Alembic version table
- Apply migration once, affects all tenants
Separate schema approach:
- Apply migration to each tenant schema
- Track alembic_version per schema or globally
# Custom Alembic env.py for multi-schema
def run_migrations_for_tenant(schema_name: str):
connectable = create_engine(DATABASE_URL)
with connectable.connect() as connection:
# Set search path
connection.execute(text(f"SET search_path TO {schema_name}, public"))
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=schema_name # Per-tenant version tracking
)
with context.begin_transaction():
context.run_migrations()
# CLI wrapper
def migrate_all():
org_ids = [1, 2, 3, ...] # Load from database
for org_id in org_ids:
schema_name = f"tenant_{org_id}"
print(f"Migrating {schema_name}")
run_migrations_for_tenant(schema_name)
Operational Reality
-
Test migrations on production replica:
- Never test in production first
- Measure migration time (long-running ALTER TABLE locks table)
-
Backward compatibility:
- Add columns as nullable, backfill data, then set NOT NULL
- Avoid renaming columns (add new, copy data, drop old)
-
Zero-downtime migrations:
- Expand-contract pattern
- Deploy code compatible with old and new schema
- Run migration after deployment
-
Rollback strategy:
- Always write
downgrade()in migrations - Test rollback before applying to production
- Always write
Example migration (add column):
# alembic/versions/001_add_document_status.py
def upgrade():
op.add_column(
"documents",
sa.Column("status", sa.String(50), nullable=True),
schema="app"
)
# Backfill default value
op.execute("UPDATE app.documents SET status = 'active' WHERE status IS NULL")
# Set NOT NULL constraint
op.alter_column("documents", "status", nullable=False, schema="app")
def downgrade():
op.drop_column("documents", "status", schema="app")
11. Failure Modes and Footguns
Connection Exhaustion
Symptom:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out
Causes:
- Sessions not closed (leaked)
- Long-running transactions holding connections
- Pool size too small for concurrency
Debug:
# Check pool status
print(engine.pool.status()) # "Pool size: 10 Connections in pool: 0 Current Overflow: 20 Current Checked out connections: 30"
# Enable pool logging
import logging
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
Solutions:
- Always close sessions (use context managers)
- Set
pool_pre_ping=Trueto detect stale connections - Increase pool size if legitimate concurrency exceeds limit
- Use connection poolers (PgBouncer) for high-concurrency services
Transaction Leaks
Symptom:
- Uncommitted transactions hold locks
- Reads see stale data (isolation level issues)
- Connections never returned to pool
Example:
session = SessionLocal()
user = session.get(User, 1)
user.email = "[email protected]"
# Missing session.commit() or session.rollback()
# Transaction open indefinitely
Solution:
with SessionLocal() as session:
with session.begin():
user = session.get(User, 1)
user.email = "[email protected]"
# Auto-commit on exit
ORM State Desynchronization
Symptom:
- Object attribute doesn’t match database value
- Stale reads after external updates
Example:
session = SessionLocal()
document = session.get(Document, 1)
print(document.title) # "Old Title"
# Another process updates the database
# UPDATE app.documents SET title = 'New Title' WHERE id = 1
print(document.title) # Still "Old Title" (identity map)
Solution:
session.expire(document) # Mark object as stale
print(document.title) # Triggers SELECT, returns "New Title"
# Or refresh explicitly
session.refresh(document)
Expire on commit:
SessionLocal = sessionmaker(bind=engine, expire_on_commit=True) # Default
After commit, all objects marked stale. Next access triggers SELECT.
Footgun: SQLite foreign keys
# SQLite
engine = create_engine("sqlite:///dev.db")
# Foreign key violations NOT enforced by default
# Must enable per connection
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
Footgun: SQLite concurrent writes
# Writer blocks all readers/writers
session.add(document)
session.commit() # Acquires exclusive lock, blocks all other connections
PostgreSQL: Multiple writers and readers can operate concurrently.
Concurrency and Isolation Issues
Lost update problem:
# Session 1
doc = session1.get(Document, 1)
doc.views = doc.views + 1
session1.commit()
# Session 2 (concurrent)
doc = session2.get(Document, 1)
doc.views = doc.views + 1
session2.commit()
# Final value: +1 instead of +2 (last write wins)
Solutions:
- Optimistic locking (version column):
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(500))
version: Mapped[int] = mapped_column(Integer, nullable=False, default=0) # Version counter
__mapper_args__ = {"version_id_col": version} # Enable optimistic locking on this column
# Usage
doc = session.get(Document, 1) # SELECT ... (version=5)
doc.title = "Updated"
session.commit() # UPDATE documents SET title=..., version=6 WHERE id=1 AND version=5
# If concurrent update changed version, WHERE clause matches 0 rows -> StaleDataError
- Pessimistic locking:
from sqlalchemy import select
stmt = select(Document).where(Document.id == 1).with_for_update() # Add FOR UPDATE clause
doc = session.execute(stmt).scalar_one() # SELECT ... FOR UPDATE (blocks other FOR UPDATE)
doc.views += 1
session.commit() # UPDATE, release lock
- Atomic increment:
from sqlalchemy import update
stmt = update(Document).where(Document.id == 1).values(
views=Document.views + 1 # Database-side increment (views = views + 1)
)
session.execute(stmt) # UPDATE documents SET views = views + 1 WHERE id = 1
session.commit() # Atomic, no race condition
Summary: When to Use What
Use ORM When:
- Modeling domain entities with relationships
- Object lifecycle matches request/transaction boundaries
- Benefits from identity map and change tracking
- Relationships are primary access pattern
- Typical CRUD operations dominate
Use Core When:
- Bulk operations (insert/update/delete thousands of rows)
- Complex aggregations, window functions, CTEs
- Performance-critical paths requiring SQL control
- Dynamic query construction
- Returning non-entity data (reports, analytics)
Bypass SQLAlchemy When:
- Stored procedures or database-specific extensions (PostGIS, full-text search)
- Extremely high throughput (use asyncpg directly)
- Schema introspection or admin tasks (use psycopg2 directly)
- Database-native features not exposed by SQLAlchemy
Continue reading
Next article
Building Systems That Don't Fall Apart: Reliability, Scalability, and Maintainability
Related Content
FastAPI in Production - Full Guide
The definitive guide to running FastAPI at scale. Real benchmarks, battle-tested patterns.
FastAPI Performance Optimization - Production-Grade Techniques
Deep dive into FastAPI performance optimization: database connection pooling, caching strategies, async patterns, profiling, and real benchmarks from production systems.
Codexity Part 8: The Complete Answer Engine
The final chapter. Assemble every module into a running application. Complete source code, Docker deployment, configuration, testing, and performance tuning for the full Codexity answer engine.