Hexagonal Architecture with FastAPI: Database, Valkey Cache, Messaging
Hexagonal Architecture for FastAPI Document Systems
1. TL;DR
- Hexagonal Architecture (ports & adapters) keeps the domain independent: business rules import nothing from FastAPI, SQLAlchemy, or Valkey.
- Outbound adapters (database, cache, messaging) implement thin interfaces (
DocumentRepository,DocumentCache,EventBus), so swapping Postgres for Dynamo happens in the adapter only. - Inbound adapters (FastAPI controllers, CLI, workers) depend on the application layer (
UseCaseclasses) instead of touching repositories directly. - Valkey sits in front of the database with cache-aside reads and event-driven invalidation; messaging publishes
DocumentSharedevents so downstream analytics or audit services stay decoupled. - Tests just supply in-memory adapters. No mocks leaks from infrastructure into the domain.
2. Domain to Model: Document Management
Core nouns
Document: immutable metadata + version, blob pointer.Folder: tree structure scoped toOrganization.User: belongs to one or more organizations, holds roles.AccessPolicy: expresses capabilities (viewer, editor, owner).
Capabilities we will build
- Upload a document with metadata (Persistence + Valkey invalidation).
- Share a document with a user (Access control + Messaging event).
- List folder contents (Cache first, DB fallback).
3. Layered Layout
src/
├── app/
│ ├── main.py # FastAPI entry point (inbound adapter)
│ ├── routes/
│ │ └── documents.py # Uses use cases only
│ └── container.py # Wires dependencies
├── domain/
│ ├── entities.py # Document, Folder, User
│ ├── value_objects.py # IDs, enums
│ ├── policies.py # Permission logic
│ └── events.py # Domain events
├── application/
│ ├── commands.py # DTOs for input data
│ ├── use_cases.py # UploadDocument, ShareDocument
│ └── ports.py # Repository, Cache, EventBus
└── infrastructure/
├── persistence/
│ ├── models.py # SQLAlchemy tables
│ ├── repositories.py # Implements DocumentRepository
│ └── database.py # Async engine/session
├── cache/
│ └── valkey_cache.py # Valkey adapter
├── messaging/
│ └── bus.py # RabbitMQ/NATS adapter
└── auth/
└── identity_provider.py
domain has zero imports pointing back into application/infrastructure. Ports live with the application layer because they are defined by use cases.
4. Domain Layer Code
# src/domain/entities.py
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Iterable
from .value_objects import DocumentId, UserId, OrganizationId, FolderId
@dataclass(frozen=True, slots=True)
class Document:
id: DocumentId
org_id: OrganizationId
folder_id: FolderId
owner_id: UserId
name: str
mime_type: str
version: int
blob_uri: str
created_at: datetime
updated_at: datetime
tags: tuple[str, ...] = field(default_factory=tuple)
def next_version(self, new_blob_uri: str) -> "Document":
return Document(
id=self.id,
org_id=self.org_id,
folder_id=self.folder_id,
owner_id=self.owner_id,
name=self.name,
mime_type=self.mime_type,
version=self.version + 1,
blob_uri=new_blob_uri,
created_at=self.created_at,
updated_at=datetime.utcnow(),
tags=self.tags,
)
@dataclass(frozen=True, slots=True)
class Folder:
id: FolderId
org_id: OrganizationId
parent_id: FolderId | None
name: str
path: str # /org/{org_id}/folder/... for fast lookups
@dataclass(frozen=True, slots=True)
class AccessPolicy:
document_id: DocumentId
user_id: UserId
capabilities: frozenset[str]
def can(self, action: str) -> bool:
return action in self.capabilities
def require_capability(policy: AccessPolicy, action: str) -> None:
if not policy.can(action):
raise PermissionError(f"User lacks {action} on {policy.document_id.value}")
# src/domain/events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True, slots=True)
class DocumentShared:
document_id: str
shared_with: str
shared_by: str
role: str
at: datetime
5. Ports and Use Cases
# src/application/ports.py
from abc import ABC, abstractmethod
from typing import Iterable, Protocol
from domain.entities import Document, Folder, AccessPolicy
from domain.events import DocumentShared
class DocumentRepository(ABC):
@abstractmethod
async def save(self, document: Document) -> None: ...
@abstractmethod
async def get(self, doc_id: str) -> Document | None: ...
@abstractmethod
async def list_by_folder(self, folder_id: str) -> Iterable[Document]: ...
class AccessPolicyRepository(ABC):
@abstractmethod
async def get_policy(self, doc_id: str, user_id: str) -> AccessPolicy | None: ...
@abstractmethod
async def upsert(self, policy: AccessPolicy) -> None: ...
class DocumentCache(Protocol):
async def get_document(self, doc_id: str) -> Document | None: ...
async def put_document(self, document: Document) -> None: ...
async def delete_document(self, doc_id: str) -> None: ...
async def get_folder_listing(self, folder_id: str) -> list[Document] | None: ...
async def put_folder_listing(self, folder_id: str, docs: list[Document]) -> None: ...
class EventBus(Protocol):
async def publish(self, event: DocumentShared) -> None: ...
# src/application/commands.py
from dataclasses import dataclass
from typing import Sequence
@dataclass(slots=True)
class UploadDocumentCommand:
doc_id: str
org_id: str
folder_id: str
owner_id: str
name: str
mime_type: str
blob_uri: str
tags: Sequence[str]
@dataclass(slots=True)
class ShareDocumentCommand:
doc_id: str
shared_by: str
shared_with: str
role: str
# src/application/use_cases.py
from datetime import datetime
from domain.entities import Document, AccessPolicy, require_capability
from domain.events import DocumentShared
from .commands import UploadDocumentCommand, ShareDocumentCommand
from .ports import DocumentRepository, DocumentCache, AccessPolicyRepository, EventBus
class UploadDocument:
def __init__(self, repo: DocumentRepository, cache: DocumentCache):
self.repo = repo
self.cache = cache
async def execute(self, cmd: UploadDocumentCommand) -> Document:
now = datetime.utcnow()
document = Document(
id=cmd.doc_id,
org_id=cmd.org_id,
folder_id=cmd.folder_id,
owner_id=cmd.owner_id,
name=cmd.name,
mime_type=cmd.mime_type,
version=1,
blob_uri=cmd.blob_uri,
created_at=now,
updated_at=now,
tags=tuple(cmd.tags),
)
await self.repo.save(document)
await self.cache.delete_document(document.id)
await self.cache.delete_document(f"folder:{document.folder_id}")
return document
class ShareDocument:
def __init__(
self,
repo: DocumentRepository,
policy_repo: AccessPolicyRepository,
cache: DocumentCache,
bus: EventBus,
) -> None:
self.repo = repo
self.policy_repo = policy_repo
self.cache = cache
self.bus = bus
async def execute(self, cmd: ShareDocumentCommand) -> None:
document = await self.cache.get_document(cmd.doc_id)
if not document:
document = await self.repo.get(cmd.doc_id)
if not document:
raise ValueError("Document not found")
await self.cache.put_document(document)
policy = await self.policy_repo.get_policy(cmd.doc_id, cmd.shared_by)
if not policy:
raise PermissionError("Sharer has no policy")
require_capability(policy, "share")
new_policy = AccessPolicy(
document_id=policy.document_id,
user_id=cmd.shared_with,
capabilities=frozenset({cmd.role}),
)
await self.policy_repo.upsert(new_policy)
await self.bus.publish(
DocumentShared(
document_id=cmd.doc_id,
shared_with=cmd.shared_with,
shared_by=cmd.shared_by,
role=cmd.role,
at=datetime.utcnow(),
)
)
class ListFolderDocuments:
def __init__(self, repo: DocumentRepository, cache: DocumentCache):
self.repo = repo
self.cache = cache
async def execute(self, folder_id: str) -> list[Document]:
cached = await self.cache.get_folder_listing(folder_id)
if cached:
return cached
docs = list(await self.repo.list_by_folder(folder_id))
await self.cache.put_folder_listing(folder_id, docs)
return docs
6. Persistence Adapter (SQLAlchemy + Postgres)
# src/infrastructure/persistence/database.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase
from typing import AsyncGenerator
# Base class for all ORM models. SQLAlchemy 2.0 uses DeclarativeBase
# instead of the old declarative_base() factory function.
class Base(DeclarativeBase):
pass
# Create async engine with asyncpg driver (fastest Postgres driver for Python).
# pool_size: number of persistent connections (one per concurrent request)
# max_overflow: additional connections created under load (total = pool_size + max_overflow)
engine: AsyncEngine = create_async_engine(
"postgresql+asyncpg://user:pass@db:5432/documents",
pool_size=10, # Tune based on expected concurrency
max_overflow=20, # Burst capacity for traffic spikes
pool_pre_ping=True, # Verify connections before using (handles stale connections)
echo=False, # Set to True in dev to see all SQL queries
)
# Session factory. expire_on_commit=False prevents lazy-loading errors
# when accessing relationships outside the session context.
SessionLocal = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)
# Dependency injection function for FastAPI.
# Yields session, auto-closes on exit (even if exception occurs).
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
yield session
# Session automatically commits if no exception, rolls back otherwise
# src/infrastructure/persistence/models.py
from datetime import datetime
from typing import Optional
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import ARRAY
from .database import Base
class DocumentModel(Base):
__tablename__ = "documents"
# Primary key: mapped_column() infers String from annotation
id: Mapped[str] = mapped_column(String, primary_key=True)
# Foreign keys: relationship integrity enforced at DB level
# nullable=False is default for non-Optional types
org_id: Mapped[str] = mapped_column(String, index=True) # Index for filtering by org
folder_id: Mapped[str] = mapped_column(ForeignKey("folders.id"), index=True)
owner_id: Mapped[str] = mapped_column(String, index=True) # Index for "my documents" queries
# Document metadata
name: Mapped[str] = mapped_column(String(255)) # Explicit length for VARCHAR
mime_type: Mapped[str] = mapped_column(String(100))
version: Mapped[int] # Auto-incremented per document via application logic
blob_uri: Mapped[str] = mapped_column(String(512)) # S3/blob storage path
# Timestamps: timezone=True stores as TIMESTAMPTZ in Postgres
created_at: Mapped[datetime] = mapped_column(index=True) # Index for sorting
updated_at: Mapped[datetime] = mapped_column(index=True)
# Tags stored as Postgres ARRAY type (more efficient than JSON for simple lists)
# Optional[list[str]] allows NULL in the database
tags: Mapped[Optional[list[str]]] = mapped_column(ARRAY(String), default=None)
class AccessPolicyModel(Base):
__tablename__ = "access_policies"
# Composite primary key (document_id, user_id)
# This enforces "one policy per user per document" at the database level
document_id: Mapped[str] = mapped_column(
ForeignKey("documents.id", ondelete="CASCADE"), # Delete policies when document deleted
primary_key=True,
)
user_id: Mapped[str] = mapped_column(String, primary_key=True)
# Capabilities stored as array: ["read", "write", "share", "admin"]
# Using ARRAY instead of JSON because we only need a simple list
capabilities: Mapped[list[str]] = mapped_column(ARRAY(String))
# Optional: Add indexes for common queries
# from sqlalchemy import Index
# Index("idx_documents_org_folder", DocumentModel.org_id, DocumentModel.folder_id)
# Index("idx_documents_created", DocumentModel.created_at.desc())
# src/infrastructure/persistence/repositories.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator
from application.ports import DocumentRepository, AccessPolicyRepository
from domain.entities import Document, AccessPolicy
from .models import DocumentModel, AccessPolicyModel
# Mapper function converts ORM model to domain entity.
# This is the anti-corruption layer: domain entities stay pure,
# unaware of SQLAlchemy or database concerns.
def map_document(model: DocumentModel) -> Document:
return Document(
id=model.id,
org_id=model.org_id,
folder_id=model.folder_id,
owner_id=model.owner_id,
name=model.name,
mime_type=model.mime_type,
version=model.version,
blob_uri=model.blob_uri,
created_at=model.created_at,
updated_at=model.updated_at,
tags=tuple(model.tags or ()), # Convert list to tuple (domain uses immutable tuple)
)
class SqlAlchemyDocumentRepository(DocumentRepository):
"""Postgres implementation of DocumentRepository port.
Session is injected per-request via FastAPI dependency injection,
ensuring each request gets its own transaction boundary.
"""
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def save(self, document: Document) -> None:
"""Insert or update document.
Uses merge() semantics: if document.id exists, updates; otherwise inserts.
Commit happens here because this is a unit of work boundary.
"""
# Convert domain entity to ORM model
model = DocumentModel(
id=document.id,
org_id=document.org_id,
folder_id=document.folder_id,
owner_id=document.owner_id,
name=document.name,
mime_type=document.mime_type,
version=document.version,
blob_uri=document.blob_uri,
created_at=document.created_at,
updated_at=document.updated_at,
tags=list(document.tags) if document.tags else None,
)
# merge() handles both insert and update based on primary key
self.session.add(model)
await self.session.commit()
# Note: flush() would write to DB but keep transaction open,
# commit() writes and closes transaction
async def get(self, doc_id: str) -> Document | None:
"""Fetch single document by ID.
SQLAlchemy 2.0 style: build statement with select(), execute, then extract result.
"""
stmt = select(DocumentModel).where(DocumentModel.id == doc_id)
result = await self.session.execute(stmt)
# scalar_one_or_none() returns the model or None (raises if multiple found)
model = result.scalar_one_or_none()
return map_document(model) if model else None
async def list_by_folder(self, folder_id: str) -> AsyncIterator[Document]:
"""Stream documents in folder.
Yields one at a time instead of loading all into memory.
Important for folders with thousands of documents.
"""
stmt = (
select(DocumentModel)
.where(DocumentModel.folder_id == folder_id)
.order_by(DocumentModel.created_at.desc()) # Newest first
)
result = await self.session.execute(stmt)
# scalars() returns iterator of models (not Row tuples)
for model in result.scalars():
yield map_document(model)
class SqlAlchemyAccessPolicyRepository(AccessPolicyRepository):
"""Postgres implementation for access control policies."""
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def get_policy(self, doc_id: str, user_id: str) -> AccessPolicy | None:
"""Fetch user's capabilities for a document."""
stmt = select(AccessPolicyModel).where(
AccessPolicyModel.document_id == doc_id,
AccessPolicyModel.user_id == user_id,
)
result = await self.session.execute(stmt)
model = result.scalar_one_or_none()
if not model:
return None
# Convert array to frozenset for immutability (domain requirement)
return AccessPolicy(
document_id=model.document_id,
user_id=model.user_id,
capabilities=frozenset(model.capabilities or []),
)
async def upsert(self, policy: AccessPolicy) -> None:
"""Insert new policy or update existing one.
Uses merge() to handle INSERT ... ON CONFLICT UPDATE logic.
Works because (document_id, user_id) is the composite primary key.
"""
model = AccessPolicyModel(
document_id=policy.document_id,
user_id=policy.user_id,
capabilities=list(policy.capabilities), # frozenset -> list for Postgres ARRAY
)
# merge() automatically detects if PK exists and does UPDATE instead of INSERT
await self.session.merge(model)
await self.session.commit()
7. Valkey Cache Adapter (Async)
# src/infrastructure/cache/valkey_cache.py
import json
from datetime import timedelta
from typing import Any
from valkey.asyncio import Valkey
from application.ports import DocumentCache
from domain.entities import Document
class ValkeyDocumentCache(DocumentCache):
def __init__(self, client: Valkey, ttl_seconds: int = 300) -> None:
self.client = client
self.ttl = ttl_seconds
async def get_document(self, doc_id: str) -> Document | None:
raw = await self.client.get(self._key(doc_id))
return self._decode(raw) if raw else None
async def put_document(self, document: Document) -> None:
await self.client.setex(self._key(document.id), self.ttl, self._encode(document))
async def delete_document(self, doc_id: str) -> None:
await self.client.delete(self._key(doc_id))
async def get_folder_listing(self, folder_id: str) -> list[Document] | None:
raw = await self.client.get(self._folder_key(folder_id))
if not raw:
return None
data = json.loads(raw)
return [self._decode(json.dumps(item)) for item in data]
async def put_folder_listing(self, folder_id: str, docs: list[Document]) -> None:
payload = json.dumps([json.loads(self._encode(doc)) for doc in docs])
await self.client.setex(self._folder_key(folder_id), self.ttl, payload)
def _key(self, doc_id: str) -> str:
return f"doc:{doc_id}"
def _folder_key(self, folder_id: str) -> str:
return f"folder:{folder_id}:docs"
def _encode(self, document: Document) -> str:
return json.dumps(document.__dict__, default=str)
def _decode(self, raw: str) -> Document:
data = json.loads(raw)
return Document(**data)
Cache strategy: reads follow cache-aside, writes invalidate the relevant keys followed by asynchronous refresh via background jobs if needed.
8. Messaging Adapter (RabbitMQ + aio-pika)
# src/infrastructure/messaging/bus.py
import json
from aio_pika import connect_robust, Message, ExchangeType
from application.ports import EventBus
from domain.events import DocumentShared
class RabbitEventBus(EventBus):
def __init__(self, url: str, exchange_name: str = "doc.events") -> None:
self.url = url
self.exchange_name = exchange_name
self._connection = None
self._exchange = None
async def start(self) -> None:
self._connection = await connect_robust(self.url)
channel = await self._connection.channel()
self._exchange = await channel.declare_exchange(self.exchange_name, ExchangeType.TOPIC)
async def stop(self) -> None:
if self._connection:
await self._connection.close()
async def publish(self, event: DocumentShared) -> None:
if not self._exchange:
raise RuntimeError("Bus not started")
payload = json.dumps(event.__dict__, default=str)
message = Message(body=payload.encode(), content_type="application/json")
routing_key = f"document.shared.{event.role}"
await self._exchange.publish(message, routing_key=routing_key)
Downstream consumers can listen on document.shared.* to trigger audit trails, notifications, etc., without the FastAPI app knowing they exist.
9. FastAPI Adapter (Inbound Port)
# src/app/routes/documents.py
from fastapi import APIRouter, Depends, status
from application.commands import UploadDocumentCommand, ShareDocumentCommand
from application.use_cases import UploadDocument, ShareDocument, ListFolderDocuments
from .deps import get_upload_uc, get_share_uc, get_list_uc
router = APIRouter(prefix="/documents", tags=["documents"])
@router.post("/", status_code=status.HTTP_201_CREATED)
async def upload_document(
payload: UploadDocumentCommand,
use_case: UploadDocument = Depends(get_upload_uc),
):
document = await use_case.execute(payload)
return document
@router.post("/{doc_id}/share", status_code=status.HTTP_204_NO_CONTENT)
async def share_document(
doc_id: str,
payload: ShareDocumentCommand,
use_case: ShareDocument = Depends(get_share_uc),
):
await use_case.execute(ShareDocumentCommand(doc_id=doc_id, **payload.__dict__))
@router.get("/folders/{folder_id}")
async def list_folder(folder_id: str, use_case: ListFolderDocuments = Depends(get_list_uc)):
return await use_case.execute(folder_id)
deps.py wires use cases from a lightweight container (see below). The router never sees SQLAlchemy or Valkey directly; it only talks to use cases.
10. Wiring Everything (Bootstrap)
# src/app/container.py
from functools import lru_cache
from valkey.asyncio import Valkey
from application.use_cases import UploadDocument, ShareDocument, ListFolderDocuments
from infrastructure.cache.valkey_cache import ValkeyDocumentCache
from infrastructure.messaging.bus import RabbitEventBus
from infrastructure.persistence.database import get_session
from infrastructure.persistence.repositories import (
SqlAlchemyDocumentRepository,
SqlAlchemyAccessPolicyRepository,
)
@lru_cache
def valkey_client() -> Valkey:
return Valkey(host="valkey", port=6379, decode_responses=True)
async def get_upload_use_case():
async for session in get_session():
repo = SqlAlchemyDocumentRepository(session)
cache = ValkeyDocumentCache(valkey_client())
return UploadDocument(repo, cache)
async def get_share_use_case():
async for session in get_session():
repo = SqlAlchemyDocumentRepository(session)
policy_repo = SqlAlchemyAccessPolicyRepository(session)
cache = ValkeyDocumentCache(valkey_client())
bus = RabbitEventBus("amqp://guest:guest@rabbitmq/")
await bus.start()
return ShareDocument(repo, policy_repo, cache, bus)
async def get_list_use_case():
async for session in get_session():
repo = SqlAlchemyDocumentRepository(session)
cache = ValkeyDocumentCache(valkey_client())
return ListFolderDocuments(repo, cache)
Finally wire the router in main.py.
# src/app/main.py
from fastapi import FastAPI
from app.routes import documents
def create_app() -> FastAPI:
app = FastAPI(title="Doc Hex Service")
app.include_router(documents.router)
return app
app = create_app()
Production deployments would use a DI container (punq, lagom, wired) or manual wiring with explicit lifetime scopes. The point: the composition root lives at the edge, not inside the domain.
11. Cache + Message Coordination
- Upload and Share use cases invalidate the cache synchronously (safe) and rely on downstream consumers to repopulate via list queries.
- If a background worker consumes
DocumentSharedevents it can hydrate denormalized projections (e.g.,shared_documentstable) without touching FastAPI code. - For high-volume folders consider storing folder listings in Valkey as sorted sets keyed by updated timestamp for fast pagination.
12. Testing with In-Memory Adapters
# tests/fakes.py
from collections import defaultdict
from application.ports import DocumentRepository, DocumentCache, EventBus, AccessPolicyRepository
class InMemoryDocumentRepository(DocumentRepository):
def __init__(self):
self.docs = {}
async def save(self, document):
self.docs[document.id] = document
async def get(self, doc_id):
return self.docs.get(doc_id)
async def list_by_folder(self, folder_id):
for doc in self.docs.values():
if doc.folder_id == folder_id:
yield doc
class InMemoryCache(DocumentCache):
def __init__(self):
self.docs = {}
self.folders = {}
async def get_document(self, doc_id):
return self.docs.get(doc_id)
async def put_document(self, document):
self.docs[document.id] = document
async def delete_document(self, doc_id):
self.docs.pop(doc_id, None)
async def get_folder_listing(self, folder_id):
return self.folders.get(folder_id)
async def put_folder_listing(self, folder_id, docs):
self.folders[folder_id] = docs
class DummyBus(EventBus):
def __init__(self):
self.events = []
async def publish(self, event):
self.events.append(event)
Tests import the exact use case class and swap the adapters above to keep unit tests pure while integration tests verify wiring against real Postgres/Valkey containers.
13. Observability + Ops
- Metrics: Expose counters for cache hits/misses, DB latency, and message publish failures via Prometheus (
FastAPIInstrumentator). - Tracing: Use OpenTelemetry instrumentation on FastAPI, SQLAlchemy, aio-pika; spans show whether cache saves the request.
- Migrations: Because the domain is persistence-agnostic you can run Alembic migrations in a dedicated command module (another adapter) without dragging FastAPI along.
- Resiliency: Wrap outbound adapters in circuit breakers (
aiobreaker) so a dead Valkey does not kill uploads; degrade gracefully by skipping caching or queuing events locally.
14. Hexagonal vs “Clean” Architecture
- Hexagonal is defined by communication boundaries (ports/adapters) rather than concentric rings. There is no insistence on
entities -> use cases -> interface adapterslayering order; instead, we focus on pushing dependencies inward so the domain stays ignorant. - The FastAPI example shows inbound adapters can be HTTP, CLI, gRPC, or background workers. Adding a CLI to bulk share documents is trivial: instantiate the same
ShareDocumentuse case with different input parsing.
15. Next Steps
- Add projections (
DocumentSearchIndex) as another outbound port backed by OpenSearch. - Introduce Valkey Streams for change data capture if RabbitMQ is unavailable.
- Layer a policy decision point (PDP) microservice by swapping
AccessPolicyRepositorywith a gRPC adapter.
Hexagonal Architecture feels verbose at first, but every dependency has a home. Business logic stays boring, and that is the entire point.
Continue reading
Next article
Java 25 Structured Concurrency: The End of Thread Leaks
Related Content
FastAPI Performance Optimization - Production-Grade Techniques
Deep dive into FastAPI performance optimization: database connection pooling, caching strategies, async patterns, profiling, and real benchmarks from production systems.
FastAPI in Production - Full Guide
The definitive guide to running FastAPI at scale. Real benchmarks, battle-tested patterns.
Codexity Part 7: Server-Sent Events and Streaming
Implement production-grade SSE streaming in FastAPI. Handle connection drops, heartbeats, backpressure, error recovery, and the HTTP details that make streaming reliable.