Skip to main content

On This Page

Hexagonal Architecture with FastAPI: Database, Valkey Cache, Messaging

15 min read
Share

Hexagonal Architecture for FastAPI Document Systems

1. TL;DR

  • Hexagonal Architecture (ports & adapters) keeps the domain independent: business rules import nothing from FastAPI, SQLAlchemy, or Valkey.
  • Outbound adapters (database, cache, messaging) implement thin interfaces (DocumentRepository, DocumentCache, EventBus), so swapping Postgres for Dynamo happens in the adapter only.
  • Inbound adapters (FastAPI controllers, CLI, workers) depend on the application layer (UseCase classes) instead of touching repositories directly.
  • Valkey sits in front of the database with cache-aside reads and event-driven invalidation; messaging publishes DocumentShared events so downstream analytics or audit services stay decoupled.
  • Tests just supply in-memory adapters. No mocks leaks from infrastructure into the domain.

2. Domain to Model: Document Management

Core nouns

  • Document: immutable metadata + version, blob pointer.
  • Folder: tree structure scoped to Organization.
  • User: belongs to one or more organizations, holds roles.
  • AccessPolicy: expresses capabilities (viewer, editor, owner).

Capabilities we will build

  1. Upload a document with metadata (Persistence + Valkey invalidation).
  2. Share a document with a user (Access control + Messaging event).
  3. List folder contents (Cache first, DB fallback).

3. Layered Layout

src/
├── app/
│   ├── main.py              # FastAPI entry point (inbound adapter)
│   ├── routes/
│   │   └── documents.py     # Uses use cases only
│   └── container.py         # Wires dependencies
├── domain/
│   ├── entities.py          # Document, Folder, User
│   ├── value_objects.py     # IDs, enums
│   ├── policies.py          # Permission logic
│   └── events.py            # Domain events
├── application/
│   ├── commands.py          # DTOs for input data
│   ├── use_cases.py         # UploadDocument, ShareDocument
│   └── ports.py             # Repository, Cache, EventBus
└── infrastructure/
	├── persistence/
	│   ├── models.py        # SQLAlchemy tables
	│   ├── repositories.py  # Implements DocumentRepository
	│   └── database.py      # Async engine/session
	├── cache/
	│   └── valkey_cache.py  # Valkey adapter
	├── messaging/
	│   └── bus.py           # RabbitMQ/NATS adapter
	└── auth/
		└── identity_provider.py

domain has zero imports pointing back into application/infrastructure. Ports live with the application layer because they are defined by use cases.

4. Domain Layer Code

# src/domain/entities.py
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Iterable

from .value_objects import DocumentId, UserId, OrganizationId, FolderId


@dataclass(frozen=True, slots=True)
class Document:
	id: DocumentId
	org_id: OrganizationId
	folder_id: FolderId
	owner_id: UserId
	name: str
	mime_type: str
	version: int
	blob_uri: str
	created_at: datetime
	updated_at: datetime
	tags: tuple[str, ...] = field(default_factory=tuple)

	def next_version(self, new_blob_uri: str) -> "Document":
		return Document(
			id=self.id,
			org_id=self.org_id,
			folder_id=self.folder_id,
			owner_id=self.owner_id,
			name=self.name,
			mime_type=self.mime_type,
			version=self.version + 1,
			blob_uri=new_blob_uri,
			created_at=self.created_at,
			updated_at=datetime.utcnow(),
			tags=self.tags,
		)


@dataclass(frozen=True, slots=True)
class Folder:
	id: FolderId
	org_id: OrganizationId
	parent_id: FolderId | None
	name: str
	path: str  # /org/{org_id}/folder/... for fast lookups


@dataclass(frozen=True, slots=True)
class AccessPolicy:
	document_id: DocumentId
	user_id: UserId
	capabilities: frozenset[str]

	def can(self, action: str) -> bool:
		return action in self.capabilities


def require_capability(policy: AccessPolicy, action: str) -> None:
	if not policy.can(action):
		raise PermissionError(f"User lacks {action} on {policy.document_id.value}")


# src/domain/events.py
from dataclasses import dataclass
from datetime import datetime


@dataclass(frozen=True, slots=True)
class DocumentShared:
	document_id: str
	shared_with: str
	shared_by: str
	role: str
	at: datetime

5. Ports and Use Cases

# src/application/ports.py
from abc import ABC, abstractmethod
from typing import Iterable, Protocol

from domain.entities import Document, Folder, AccessPolicy
from domain.events import DocumentShared


class DocumentRepository(ABC):
	@abstractmethod
	async def save(self, document: Document) -> None: ...

	@abstractmethod
	async def get(self, doc_id: str) -> Document | None: ...

	@abstractmethod
	async def list_by_folder(self, folder_id: str) -> Iterable[Document]: ...


class AccessPolicyRepository(ABC):
	@abstractmethod
	async def get_policy(self, doc_id: str, user_id: str) -> AccessPolicy | None: ...

	@abstractmethod
	async def upsert(self, policy: AccessPolicy) -> None: ...


class DocumentCache(Protocol):
	async def get_document(self, doc_id: str) -> Document | None: ...
	async def put_document(self, document: Document) -> None: ...
	async def delete_document(self, doc_id: str) -> None: ...
	async def get_folder_listing(self, folder_id: str) -> list[Document] | None: ...
	async def put_folder_listing(self, folder_id: str, docs: list[Document]) -> None: ...


class EventBus(Protocol):
	async def publish(self, event: DocumentShared) -> None: ...


# src/application/commands.py
from dataclasses import dataclass
from typing import Sequence


@dataclass(slots=True)
class UploadDocumentCommand:
	doc_id: str
	org_id: str
	folder_id: str
	owner_id: str
	name: str
	mime_type: str
	blob_uri: str
	tags: Sequence[str]


@dataclass(slots=True)
class ShareDocumentCommand:
	doc_id: str
	shared_by: str
	shared_with: str
	role: str


# src/application/use_cases.py
from datetime import datetime

from domain.entities import Document, AccessPolicy, require_capability
from domain.events import DocumentShared
from .commands import UploadDocumentCommand, ShareDocumentCommand
from .ports import DocumentRepository, DocumentCache, AccessPolicyRepository, EventBus


class UploadDocument:
	def __init__(self, repo: DocumentRepository, cache: DocumentCache):
		self.repo = repo
		self.cache = cache

	async def execute(self, cmd: UploadDocumentCommand) -> Document:
		now = datetime.utcnow()
		document = Document(
			id=cmd.doc_id,
			org_id=cmd.org_id,
			folder_id=cmd.folder_id,
			owner_id=cmd.owner_id,
			name=cmd.name,
			mime_type=cmd.mime_type,
			version=1,
			blob_uri=cmd.blob_uri,
			created_at=now,
			updated_at=now,
			tags=tuple(cmd.tags),
		)
		await self.repo.save(document)
		await self.cache.delete_document(document.id)
		await self.cache.delete_document(f"folder:{document.folder_id}")
		return document


class ShareDocument:
	def __init__(
		self,
		repo: DocumentRepository,
		policy_repo: AccessPolicyRepository,
		cache: DocumentCache,
		bus: EventBus,
	) -> None:
		self.repo = repo
		self.policy_repo = policy_repo
		self.cache = cache
		self.bus = bus

	async def execute(self, cmd: ShareDocumentCommand) -> None:
		document = await self.cache.get_document(cmd.doc_id)
		if not document:
			document = await self.repo.get(cmd.doc_id)
			if not document:
				raise ValueError("Document not found")
			await self.cache.put_document(document)

		policy = await self.policy_repo.get_policy(cmd.doc_id, cmd.shared_by)
		if not policy:
			raise PermissionError("Sharer has no policy")

		require_capability(policy, "share")

		new_policy = AccessPolicy(
			document_id=policy.document_id,
			user_id=cmd.shared_with,
			capabilities=frozenset({cmd.role}),
		)
		await self.policy_repo.upsert(new_policy)
		await self.bus.publish(
			DocumentShared(
				document_id=cmd.doc_id,
				shared_with=cmd.shared_with,
				shared_by=cmd.shared_by,
				role=cmd.role,
				at=datetime.utcnow(),
			)
		)


class ListFolderDocuments:
	def __init__(self, repo: DocumentRepository, cache: DocumentCache):
		self.repo = repo
		self.cache = cache

	async def execute(self, folder_id: str) -> list[Document]:
		cached = await self.cache.get_folder_listing(folder_id)
		if cached:
			return cached
		docs = list(await self.repo.list_by_folder(folder_id))
		await self.cache.put_folder_listing(folder_id, docs)
		return docs

6. Persistence Adapter (SQLAlchemy + Postgres)

# src/infrastructure/persistence/database.py
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase
from typing import AsyncGenerator


# Base class for all ORM models. SQLAlchemy 2.0 uses DeclarativeBase
# instead of the old declarative_base() factory function.
class Base(DeclarativeBase):
    pass


# Create async engine with asyncpg driver (fastest Postgres driver for Python).
# pool_size: number of persistent connections (one per concurrent request)
# max_overflow: additional connections created under load (total = pool_size + max_overflow)
engine: AsyncEngine = create_async_engine(
    "postgresql+asyncpg://user:pass@db:5432/documents",
    pool_size=10,  # Tune based on expected concurrency
    max_overflow=20,  # Burst capacity for traffic spikes
    pool_pre_ping=True,  # Verify connections before using (handles stale connections)
    echo=False,  # Set to True in dev to see all SQL queries
)

# Session factory. expire_on_commit=False prevents lazy-loading errors
# when accessing relationships outside the session context.
SessionLocal = async_sessionmaker(
    engine,
    expire_on_commit=False,
    class_=AsyncSession,
)


# Dependency injection function for FastAPI.
# Yields session, auto-closes on exit (even if exception occurs).
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with SessionLocal() as session:
        yield session
        # Session automatically commits if no exception, rolls back otherwise
# src/infrastructure/persistence/models.py
from datetime import datetime
from typing import Optional
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import ARRAY

from .database import Base


class DocumentModel(Base):
    __tablename__ = "documents"

    # Primary key: mapped_column() infers String from annotation
    id: Mapped[str] = mapped_column(String, primary_key=True)
    
    # Foreign keys: relationship integrity enforced at DB level
    # nullable=False is default for non-Optional types
    org_id: Mapped[str] = mapped_column(String, index=True)  # Index for filtering by org
    folder_id: Mapped[str] = mapped_column(ForeignKey("folders.id"), index=True)
    owner_id: Mapped[str] = mapped_column(String, index=True)  # Index for "my documents" queries
    
    # Document metadata
    name: Mapped[str] = mapped_column(String(255))  # Explicit length for VARCHAR
    mime_type: Mapped[str] = mapped_column(String(100))
    version: Mapped[int]  # Auto-incremented per document via application logic
    blob_uri: Mapped[str] = mapped_column(String(512))  # S3/blob storage path
    
    # Timestamps: timezone=True stores as TIMESTAMPTZ in Postgres
    created_at: Mapped[datetime] = mapped_column(index=True)  # Index for sorting
    updated_at: Mapped[datetime] = mapped_column(index=True)
    
    # Tags stored as Postgres ARRAY type (more efficient than JSON for simple lists)
    # Optional[list[str]] allows NULL in the database
    tags: Mapped[Optional[list[str]]] = mapped_column(ARRAY(String), default=None)


class AccessPolicyModel(Base):
    __tablename__ = "access_policies"
    
    # Composite primary key (document_id, user_id)
    # This enforces "one policy per user per document" at the database level
    document_id: Mapped[str] = mapped_column(
        ForeignKey("documents.id", ondelete="CASCADE"),  # Delete policies when document deleted
        primary_key=True,
    )
    user_id: Mapped[str] = mapped_column(String, primary_key=True)
    
    # Capabilities stored as array: ["read", "write", "share", "admin"]
    # Using ARRAY instead of JSON because we only need a simple list
    capabilities: Mapped[list[str]] = mapped_column(ARRAY(String))


# Optional: Add indexes for common queries
# from sqlalchemy import Index
# Index("idx_documents_org_folder", DocumentModel.org_id, DocumentModel.folder_id)
# Index("idx_documents_created", DocumentModel.created_at.desc())
# src/infrastructure/persistence/repositories.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator

from application.ports import DocumentRepository, AccessPolicyRepository
from domain.entities import Document, AccessPolicy
from .models import DocumentModel, AccessPolicyModel


# Mapper function converts ORM model to domain entity.
# This is the anti-corruption layer: domain entities stay pure,
# unaware of SQLAlchemy or database concerns.
def map_document(model: DocumentModel) -> Document:
    return Document(
        id=model.id,
        org_id=model.org_id,
        folder_id=model.folder_id,
        owner_id=model.owner_id,
        name=model.name,
        mime_type=model.mime_type,
        version=model.version,
        blob_uri=model.blob_uri,
        created_at=model.created_at,
        updated_at=model.updated_at,
        tags=tuple(model.tags or ()),  # Convert list to tuple (domain uses immutable tuple)
    )


class SqlAlchemyDocumentRepository(DocumentRepository):
    """Postgres implementation of DocumentRepository port.
    
    Session is injected per-request via FastAPI dependency injection,
    ensuring each request gets its own transaction boundary.
    """

    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def save(self, document: Document) -> None:
        """Insert or update document.
        
        Uses merge() semantics: if document.id exists, updates; otherwise inserts.
        Commit happens here because this is a unit of work boundary.
        """
        # Convert domain entity to ORM model
        model = DocumentModel(
            id=document.id,
            org_id=document.org_id,
            folder_id=document.folder_id,
            owner_id=document.owner_id,
            name=document.name,
            mime_type=document.mime_type,
            version=document.version,
            blob_uri=document.blob_uri,
            created_at=document.created_at,
            updated_at=document.updated_at,
            tags=list(document.tags) if document.tags else None,
        )
        
        # merge() handles both insert and update based on primary key
        self.session.add(model)
        await self.session.commit()
        # Note: flush() would write to DB but keep transaction open,
        # commit() writes and closes transaction

    async def get(self, doc_id: str) -> Document | None:
        """Fetch single document by ID.
        
        SQLAlchemy 2.0 style: build statement with select(), execute, then extract result.
        """
        stmt = select(DocumentModel).where(DocumentModel.id == doc_id)
        result = await self.session.execute(stmt)
        
        # scalar_one_or_none() returns the model or None (raises if multiple found)
        model = result.scalar_one_or_none()
        return map_document(model) if model else None

    async def list_by_folder(self, folder_id: str) -> AsyncIterator[Document]:
        """Stream documents in folder.
        
        Yields one at a time instead of loading all into memory.
        Important for folders with thousands of documents.
        """
        stmt = (
            select(DocumentModel)
            .where(DocumentModel.folder_id == folder_id)
            .order_by(DocumentModel.created_at.desc())  # Newest first
        )
        result = await self.session.execute(stmt)
        
        # scalars() returns iterator of models (not Row tuples)
        for model in result.scalars():
            yield map_document(model)


class SqlAlchemyAccessPolicyRepository(AccessPolicyRepository):
    """Postgres implementation for access control policies."""

    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def get_policy(self, doc_id: str, user_id: str) -> AccessPolicy | None:
        """Fetch user's capabilities for a document."""
        stmt = select(AccessPolicyModel).where(
            AccessPolicyModel.document_id == doc_id,
            AccessPolicyModel.user_id == user_id,
        )
        result = await self.session.execute(stmt)
        model = result.scalar_one_or_none()
        
        if not model:
            return None
        
        # Convert array to frozenset for immutability (domain requirement)
        return AccessPolicy(
            document_id=model.document_id,
            user_id=model.user_id,
            capabilities=frozenset(model.capabilities or []),
        )

    async def upsert(self, policy: AccessPolicy) -> None:
        """Insert new policy or update existing one.
        
        Uses merge() to handle INSERT ... ON CONFLICT UPDATE logic.
        Works because (document_id, user_id) is the composite primary key.
        """
        model = AccessPolicyModel(
            document_id=policy.document_id,
            user_id=policy.user_id,
            capabilities=list(policy.capabilities),  # frozenset -> list for Postgres ARRAY
        )
        
        # merge() automatically detects if PK exists and does UPDATE instead of INSERT
        await self.session.merge(model)
        await self.session.commit()

7. Valkey Cache Adapter (Async)

# src/infrastructure/cache/valkey_cache.py
import json
from datetime import timedelta
from typing import Any

from valkey.asyncio import Valkey

from application.ports import DocumentCache
from domain.entities import Document


class ValkeyDocumentCache(DocumentCache):
	def __init__(self, client: Valkey, ttl_seconds: int = 300) -> None:
		self.client = client
		self.ttl = ttl_seconds

	async def get_document(self, doc_id: str) -> Document | None:
		raw = await self.client.get(self._key(doc_id))
		return self._decode(raw) if raw else None

	async def put_document(self, document: Document) -> None:
		await self.client.setex(self._key(document.id), self.ttl, self._encode(document))

	async def delete_document(self, doc_id: str) -> None:
		await self.client.delete(self._key(doc_id))

	async def get_folder_listing(self, folder_id: str) -> list[Document] | None:
		raw = await self.client.get(self._folder_key(folder_id))
		if not raw:
			return None
		data = json.loads(raw)
		return [self._decode(json.dumps(item)) for item in data]

	async def put_folder_listing(self, folder_id: str, docs: list[Document]) -> None:
		payload = json.dumps([json.loads(self._encode(doc)) for doc in docs])
		await self.client.setex(self._folder_key(folder_id), self.ttl, payload)

	def _key(self, doc_id: str) -> str:
		return f"doc:{doc_id}"

	def _folder_key(self, folder_id: str) -> str:
		return f"folder:{folder_id}:docs"

	def _encode(self, document: Document) -> str:
		return json.dumps(document.__dict__, default=str)

	def _decode(self, raw: str) -> Document:
		data = json.loads(raw)
		return Document(**data)

Cache strategy: reads follow cache-aside, writes invalidate the relevant keys followed by asynchronous refresh via background jobs if needed.

8. Messaging Adapter (RabbitMQ + aio-pika)

# src/infrastructure/messaging/bus.py
import json
from aio_pika import connect_robust, Message, ExchangeType

from application.ports import EventBus
from domain.events import DocumentShared


class RabbitEventBus(EventBus):
	def __init__(self, url: str, exchange_name: str = "doc.events") -> None:
		self.url = url
		self.exchange_name = exchange_name
		self._connection = None
		self._exchange = None

	async def start(self) -> None:
		self._connection = await connect_robust(self.url)
		channel = await self._connection.channel()
		self._exchange = await channel.declare_exchange(self.exchange_name, ExchangeType.TOPIC)

	async def stop(self) -> None:
		if self._connection:
			await self._connection.close()

	async def publish(self, event: DocumentShared) -> None:
		if not self._exchange:
			raise RuntimeError("Bus not started")
		payload = json.dumps(event.__dict__, default=str)
		message = Message(body=payload.encode(), content_type="application/json")
		routing_key = f"document.shared.{event.role}"
		await self._exchange.publish(message, routing_key=routing_key)

Downstream consumers can listen on document.shared.* to trigger audit trails, notifications, etc., without the FastAPI app knowing they exist.

9. FastAPI Adapter (Inbound Port)

# src/app/routes/documents.py
from fastapi import APIRouter, Depends, status

from application.commands import UploadDocumentCommand, ShareDocumentCommand
from application.use_cases import UploadDocument, ShareDocument, ListFolderDocuments
from .deps import get_upload_uc, get_share_uc, get_list_uc

router = APIRouter(prefix="/documents", tags=["documents"])


@router.post("/", status_code=status.HTTP_201_CREATED)
async def upload_document(
	payload: UploadDocumentCommand,
	use_case: UploadDocument = Depends(get_upload_uc),
):
	document = await use_case.execute(payload)
	return document


@router.post("/{doc_id}/share", status_code=status.HTTP_204_NO_CONTENT)
async def share_document(
	doc_id: str,
	payload: ShareDocumentCommand,
	use_case: ShareDocument = Depends(get_share_uc),
):
	await use_case.execute(ShareDocumentCommand(doc_id=doc_id, **payload.__dict__))


@router.get("/folders/{folder_id}")
async def list_folder(folder_id: str, use_case: ListFolderDocuments = Depends(get_list_uc)):
	return await use_case.execute(folder_id)

deps.py wires use cases from a lightweight container (see below). The router never sees SQLAlchemy or Valkey directly; it only talks to use cases.

10. Wiring Everything (Bootstrap)

# src/app/container.py
from functools import lru_cache

from valkey.asyncio import Valkey

from application.use_cases import UploadDocument, ShareDocument, ListFolderDocuments
from infrastructure.cache.valkey_cache import ValkeyDocumentCache
from infrastructure.messaging.bus import RabbitEventBus
from infrastructure.persistence.database import get_session
from infrastructure.persistence.repositories import (
	SqlAlchemyDocumentRepository,
	SqlAlchemyAccessPolicyRepository,
)


@lru_cache
def valkey_client() -> Valkey:
	return Valkey(host="valkey", port=6379, decode_responses=True)


async def get_upload_use_case():
	async for session in get_session():
		repo = SqlAlchemyDocumentRepository(session)
		cache = ValkeyDocumentCache(valkey_client())
		return UploadDocument(repo, cache)


async def get_share_use_case():
	async for session in get_session():
		repo = SqlAlchemyDocumentRepository(session)
		policy_repo = SqlAlchemyAccessPolicyRepository(session)
		cache = ValkeyDocumentCache(valkey_client())
		bus = RabbitEventBus("amqp://guest:guest@rabbitmq/")
		await bus.start()
		return ShareDocument(repo, policy_repo, cache, bus)


async def get_list_use_case():
	async for session in get_session():
		repo = SqlAlchemyDocumentRepository(session)
		cache = ValkeyDocumentCache(valkey_client())
		return ListFolderDocuments(repo, cache)

Finally wire the router in main.py.

# src/app/main.py
from fastapi import FastAPI

from app.routes import documents


def create_app() -> FastAPI:
	app = FastAPI(title="Doc Hex Service")
	app.include_router(documents.router)
	return app


app = create_app()

Production deployments would use a DI container (punq, lagom, wired) or manual wiring with explicit lifetime scopes. The point: the composition root lives at the edge, not inside the domain.

11. Cache + Message Coordination

  • Upload and Share use cases invalidate the cache synchronously (safe) and rely on downstream consumers to repopulate via list queries.
  • If a background worker consumes DocumentShared events it can hydrate denormalized projections (e.g., shared_documents table) without touching FastAPI code.
  • For high-volume folders consider storing folder listings in Valkey as sorted sets keyed by updated timestamp for fast pagination.

12. Testing with In-Memory Adapters

# tests/fakes.py
from collections import defaultdict

from application.ports import DocumentRepository, DocumentCache, EventBus, AccessPolicyRepository


class InMemoryDocumentRepository(DocumentRepository):
	def __init__(self):
		self.docs = {}

	async def save(self, document):
		self.docs[document.id] = document

	async def get(self, doc_id):
		return self.docs.get(doc_id)

	async def list_by_folder(self, folder_id):
		for doc in self.docs.values():
			if doc.folder_id == folder_id:
				yield doc


class InMemoryCache(DocumentCache):
	def __init__(self):
		self.docs = {}
		self.folders = {}

	async def get_document(self, doc_id):
		return self.docs.get(doc_id)

	async def put_document(self, document):
		self.docs[document.id] = document

	async def delete_document(self, doc_id):
		self.docs.pop(doc_id, None)

	async def get_folder_listing(self, folder_id):
		return self.folders.get(folder_id)

	async def put_folder_listing(self, folder_id, docs):
		self.folders[folder_id] = docs


class DummyBus(EventBus):
	def __init__(self):
		self.events = []

	async def publish(self, event):
		self.events.append(event)

Tests import the exact use case class and swap the adapters above to keep unit tests pure while integration tests verify wiring against real Postgres/Valkey containers.

13. Observability + Ops

  • Metrics: Expose counters for cache hits/misses, DB latency, and message publish failures via Prometheus (FastAPIInstrumentator).
  • Tracing: Use OpenTelemetry instrumentation on FastAPI, SQLAlchemy, aio-pika; spans show whether cache saves the request.
  • Migrations: Because the domain is persistence-agnostic you can run Alembic migrations in a dedicated command module (another adapter) without dragging FastAPI along.
  • Resiliency: Wrap outbound adapters in circuit breakers (aiobreaker) so a dead Valkey does not kill uploads; degrade gracefully by skipping caching or queuing events locally.

14. Hexagonal vs “Clean” Architecture

  • Hexagonal is defined by communication boundaries (ports/adapters) rather than concentric rings. There is no insistence on entities -> use cases -> interface adapters layering order; instead, we focus on pushing dependencies inward so the domain stays ignorant.
  • The FastAPI example shows inbound adapters can be HTTP, CLI, gRPC, or background workers. Adding a CLI to bulk share documents is trivial: instantiate the same ShareDocument use case with different input parsing.

15. Next Steps

  • Add projections (DocumentSearchIndex) as another outbound port backed by OpenSearch.
  • Introduce Valkey Streams for change data capture if RabbitMQ is unavailable.
  • Layer a policy decision point (PDP) microservice by swapping AccessPolicyRepository with a gRPC adapter.

Hexagonal Architecture feels verbose at first, but every dependency has a home. Business logic stays boring, and that is the entire point.

Continue reading

Next article

Java 25 Structured Concurrency: The End of Thread Leaks

Related Content