Draft / Scheduled Content
This article is a draft or scheduled for future publication. The content is subject to change.
Codexity Part 8: The Complete Answer Engine
Codexity Part 8: The Complete Answer Engine
Seven chapters of components. Time to weld them together.
This final chapter contains the complete, runnable source code for Codexity. Every module from the series, integrated, tested, and ready to deploy. By the end, you will run a single command and have a working answer engine that accepts questions, searches the web, scrapes pages, and streams cited answers.
Project Structure (Final)
codexity/
├── main.py
├── config.py
├── models.py
├── llm_client.py
├── query_rewriter.py
├── searcher.py
├── scraper.py
├── content_processor.py
├── synthesizer.py
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── models/
└── (download GGUF model here)
Complete Source: config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# LLM
model_path: str = "./models/qwen2.5-7b-instruct-q4_k_m.gguf"
context_length: int = 8192
max_tokens: int = 2048
# Search
max_search_results: int = 8
max_queries: int = 3
# Scraping
scrape_timeout: int = 15
max_concurrent_scrapes: int = 5
use_playwright: bool = True
# Content processing
chunk_size: int = 512
chunk_overlap: int = 50
top_k_chunks: int = 10
max_chunks_per_source: int = 3
# Server
host: str = "0.0.0.0"
port: int = 8000
log_level: str = "info"
class Config:
env_file = ".env"
settings = Settings()
Complete Source: models.py
from pydantic import BaseModel
class SearchResult(BaseModel):
title: str
url: str
snippet: str
class ScrapedPage(BaseModel):
url: str
title: str
content: str
success: bool
class TextChunk(BaseModel):
text: str
source_url: str
source_title: str
relevance_score: float = 0.0
class SourceReference(BaseModel):
index: int
title: str
url: str
Complete Source: llm_client.py
import asyncio
from llama_cpp import Llama
from config import settings
_llm: Llama | None = None
def get_llm() -> Llama:
global _llm
if _llm is None:
_llm = Llama(
model_path=settings.model_path,
n_ctx=settings.context_length,
n_threads=4,
n_gpu_layers=0,
verbose=False,
)
return _llm
def generate(prompt: str, max_tokens: int = 512, temperature: float = 0.1) -> str:
llm = get_llm()
response = llm.create_chat_completion(
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature,
)
return response["choices"][0]["message"]["content"]
async def generate_streaming(
prompt: str,
system: str = "",
max_tokens: int = 2048,
):
llm = get_llm()
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
for chunk in llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
top_p=0.9,
repeat_penalty=1.1,
stream=True,
):
delta = chunk["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]
await asyncio.sleep(0)
Complete Source: query_rewriter.py
import json
import re
from llm_client import generate
from config import settings
REWRITE_PROMPT = """You are a search query optimizer. Given a user question, generate {max_queries} specific search queries that will find the most relevant information.
Rules:
- Each query should be 4-8 words
- Include the current year (2026) when time-relevance matters
- Use specific technical terms over conversational language
- For comparisons, generate one query per option plus one comparison query
- Output ONLY a JSON array of strings, nothing else
User question: {question}
JSON array:"""
def rewrite_query(question: str) -> list[str]:
prompt = REWRITE_PROMPT.format(
question=question,
max_queries=settings.max_queries,
)
raw = generate(prompt, max_tokens=200, temperature=0.1)
return parse_queries(raw, question)
def parse_queries(raw: str, fallback: str) -> list[str]:
try:
cleaned = raw.strip()
match = re.search(r'\[.*?\]', cleaned, re.DOTALL)
if match:
queries = json.loads(match.group())
if isinstance(queries, list) and all(isinstance(q, str) for q in queries):
return queries[:settings.max_queries]
except (json.JSONDecodeError, ValueError):
pass
lines = [
line.strip().strip('"-,').strip()
for line in raw.strip().split('\n')
if line.strip() and not line.strip().startswith('{')
]
queries = [l for l in lines if len(l) > 5]
if queries:
return queries[:settings.max_queries]
return [fallback]
Complete Source: searcher.py
import asyncio
from urllib.parse import urlparse
from duckduckgo_search import DDGS
from models import SearchResult
from config import settings
async def search_parallel(queries: list[str]) -> list[SearchResult]:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(None, _search_sync, q)
for q in queries
]
results_lists = await asyncio.gather(*tasks, return_exceptions=True)
all_results = []
for results in results_lists:
if isinstance(results, Exception):
continue
all_results.extend(results)
return deduplicate_smart(all_results)
def _search_sync(query: str) -> list[SearchResult]:
try:
ddgs = DDGS()
results = ddgs.text(
query,
max_results=settings.max_search_results,
region="wt-wt",
)
return [
SearchResult(
title=r.get("title", ""),
url=r.get("href", ""),
snippet=r.get("body", ""),
)
for r in results
if r.get("href")
]
except Exception as e:
print(f"Search failed for '{query}': {e}")
return []
def deduplicate_smart(
results: list[SearchResult],
max_per_domain: int = 3,
) -> list[SearchResult]:
seen_urls: set[str] = set()
domain_counts: dict[str, int] = {}
unique = []
for result in results:
normalized = normalize_url(result.url)
if normalized in seen_urls:
continue
domain = urlparse(result.url).netloc
count = domain_counts.get(domain, 0)
if count >= max_per_domain:
continue
seen_urls.add(normalized)
domain_counts[domain] = count + 1
unique.append(result)
return unique
def normalize_url(url: str) -> str:
url = url.split('#')[0].rstrip('/')
if '?' in url:
base, params = url.split('?', 1)
clean_params = '&'.join(
p for p in params.split('&')
if not p.startswith(('utm_', 'ref=', 'source='))
)
url = f"{base}?{clean_params}" if clean_params else base
return url
Complete Source: scraper.py
import asyncio
import httpx
from bs4 import BeautifulSoup
from readability import Document
from models import ScrapedPage
from config import settings
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/125.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
async def scrape_urls(urls: list[str]) -> list[ScrapedPage]:
semaphore = asyncio.Semaphore(settings.max_concurrent_scrapes)
async with httpx.AsyncClient(http2=True, timeout=settings.scrape_timeout) as client:
tasks = [_scrape_one(url, client, semaphore) for url in urls]
pages = await asyncio.gather(*tasks, return_exceptions=True)
return [
p for p in pages
if isinstance(p, ScrapedPage) and p.success
]
async def _scrape_one(
url: str,
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
) -> ScrapedPage | None:
async with semaphore:
# Tier 1: httpx
page = await _scrape_httpx(url, client)
if page is not None:
return page
# Tier 2: Playwright (if enabled)
if settings.use_playwright:
page = await _scrape_playwright(url)
return page
return None
async def _scrape_httpx(url: str, client: httpx.AsyncClient) -> ScrapedPage | None:
try:
response = await client.get(url, headers=HEADERS, follow_redirects=True)
if response.status_code != 200:
return None
html = response.text
if _needs_javascript(html):
return None
content = extract_content(html)
title = extract_title(html)
if len(content) < 100:
return None
return ScrapedPage(url=url, title=title, content=content, success=True)
except (httpx.TimeoutException, httpx.ConnectError, httpx.HTTPStatusError):
return None
async def _scrape_playwright(url: str) -> ScrapedPage | None:
try:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-dev-shm-usage",
"--no-sandbox",
],
)
context = await browser.new_context(
user_agent=HEADERS["User-Agent"],
viewport={"width": 1920, "height": 1080},
)
page = await context.new_page()
await page.route(
"**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,css}",
lambda route: route.abort(),
)
await page.goto(url, wait_until="domcontentloaded", timeout=15000)
await page.wait_for_timeout(2000)
html = await page.content()
await browser.close()
content = extract_content(html)
title = extract_title(html)
if len(content) < 100:
return None
return ScrapedPage(url=url, title=title, content=content, success=True)
except Exception:
return None
def _needs_javascript(html: str) -> bool:
if len(html) < 1000:
return True
soup = BeautifulSoup(html, "lxml")
text = soup.get_text(strip=True)
if len(text) < 200:
return True
body = soup.find("body")
if body:
children = [c for c in body.children if c.name and c.name != "script"]
if len(children) <= 2 and body.find_all("script"):
return True
return False
def extract_content(html: str) -> str:
doc = Document(html)
article_html = doc.summary()
soup = BeautifulSoup(article_html, "lxml")
for tag in soup.find_all(["nav", "footer", "header", "aside", "form"]):
tag.decompose()
for tag in soup.find_all(class_=lambda c: c and any(
x in c.lower() for x in ["sidebar", "cookie", "newsletter", "popup", "modal", "ad-"]
)):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
lines = [line for line in text.split("\n") if line.strip()]
return "\n".join(lines)
def extract_title(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
og = soup.find("meta", property="og:title")
if og and og.get("content"):
return og["content"]
if soup.title and soup.title.string:
return soup.title.string.strip()
h1 = soup.find("h1")
if h1:
return h1.get_text(strip=True)
return ""
Complete Source: content_processor.py
from rank_bm25 import BM25Okapi
from models import ScrapedPage, TextChunk, SourceReference
from config import settings
async def process_content(
pages: list[ScrapedPage],
query: str,
) -> tuple[str, list[SourceReference]]:
chunks = _pages_to_chunks(pages)
scored = _score_chunks(chunks, query)
selected = _select_top_chunks(scored)
context, sources = _build_context(selected)
return context, sources
def _pages_to_chunks(pages: list[ScrapedPage]) -> list[TextChunk]:
all_chunks = []
for page in pages:
for chunk_text in _chunk_text(page.content):
all_chunks.append(TextChunk(
text=chunk_text,
source_url=page.url,
source_title=page.title,
))
return all_chunks
def _chunk_text(text: str) -> list[str]:
words = text.split()
size = settings.chunk_size
overlap = settings.chunk_overlap
if len(words) <= size:
return [text]
chunks = []
start = 0
while start < len(words):
end = start + size
chunks.append(" ".join(words[start:end]))
start += size - overlap
return chunks
def _score_chunks(chunks: list[TextChunk], query: str) -> list[TextChunk]:
if not chunks:
return []
corpus = [c.text.lower().split() for c in chunks]
bm25 = BM25Okapi(corpus)
scores = bm25.get_scores(query.lower().split())
for chunk, score in zip(chunks, scores):
chunk.relevance_score = float(score)
return sorted(chunks, key=lambda c: c.relevance_score, reverse=True)
def _select_top_chunks(chunks: list[TextChunk]) -> list[TextChunk]:
selected = []
source_counts: dict[str, int] = {}
for chunk in chunks:
url = chunk.source_url
count = source_counts.get(url, 0)
if count >= settings.max_chunks_per_source:
continue
selected.append(chunk)
source_counts[url] = count + 1
if len(selected) >= settings.top_k_chunks:
break
return selected
def _build_context(chunks: list[TextChunk]) -> tuple[str, list[SourceReference]]:
source_map: dict[str, int] = {}
sources: list[SourceReference] = []
counter = 1
for chunk in chunks:
if chunk.source_url not in source_map:
source_map[chunk.source_url] = counter
sources.append(SourceReference(
index=counter,
title=chunk.source_title,
url=chunk.source_url,
))
counter += 1
parts = []
for chunk in chunks:
num = source_map[chunk.source_url]
parts.append(f"[Source {num}]\n{chunk.text}")
return "\n\n".join(parts), sources
Complete Source: synthesizer.py
import re
from llm_client import generate_streaming
from models import SourceReference
SYSTEM_PROMPT = """You are a search assistant that answers questions using provided sources.
Rules:
- Base your answer ONLY on the provided sources
- Cite sources using [1], [2], etc. matching the source numbers
- Every factual claim must have a citation
- If sources disagree, mention both perspectives with their citations
- Write clear, direct paragraphs
- Do not make up information not in the sources
- If the sources do not contain enough information, say so"""
async def synthesize(
query: str,
context: str,
sources: list[SourceReference],
):
source_list = "\n".join(
f"[{s.index}] {s.title} ({s.url})" for s in sources
)
prompt = f"""Sources:
{source_list}
Context:
{context}
Question: {query}
Answer:"""
full_answer = ""
async for token in generate_streaming(
prompt=prompt,
system=SYSTEM_PROMPT,
max_tokens=2048,
):
full_answer += token
yield token
# Post-process: log if invalid citations were generated
max_source = max(s.index for s in sources) if sources else 0
invalid = re.findall(r'\[(\d+)\]', full_answer)
invalid_refs = [int(n) for n in invalid if int(n) > max_source]
if invalid_refs:
print(f"Warning: generated invalid citation refs: {invalid_refs}")
Complete Source: main.py
import asyncio
import json
import logging
from fastapi import FastAPI, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
from config import settings
from query_rewriter import rewrite_query
from searcher import search_parallel
from scraper import scrape_urls
from content_processor import process_content
from synthesizer import synthesize
logging.basicConfig(level=settings.log_level.upper())
logger = logging.getLogger("codexity")
app = FastAPI(title="Codexity", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET"],
allow_headers=["*"],
)
async def search_pipeline(query: str, request: Request):
try:
# Phase 1: Rewrite
yield _event("status", {"step": "rewriting_query"})
queries = rewrite_query(query)
yield _event("status", {"step": "queries_ready", "queries": queries})
# Phase 2: Search
if await request.is_disconnected():
return
yield _event("status", {"step": "searching"})
search_results = await search_parallel(queries)
if not search_results:
yield _event("error", {"message": "No search results found"})
yield _event("done", {})
return
yield _event("sources_preview", {
"count": len(search_results),
})
# Phase 3: Scrape
if await request.is_disconnected():
return
yield _event("status", {"step": "scraping"})
urls = [r.url for r in search_results]
pages = await scrape_urls(urls)
if not pages:
yield _event("error", {"message": "Failed to scrape any pages"})
yield _event("done", {})
return
yield _event("status", {
"step": "scraping_done",
"scraped": len(pages),
"total": len(urls),
})
# Phase 4: Process
if await request.is_disconnected():
return
yield _event("status", {"step": "processing"})
context, sources = await process_content(pages, query)
yield _event("sources", {
"sources": [
{"index": s.index, "title": s.title, "url": s.url}
for s in sources
],
})
# Phase 5: Synthesize
if await request.is_disconnected():
return
yield _event("status", {"step": "generating"})
async for token in synthesize(query, context, sources):
if await request.is_disconnected():
return
yield _event("token", {"text": token})
yield _event("done", {})
except Exception:
logger.exception(f"Pipeline error for query: {query}")
yield _event("error", {"message": "An internal error occurred"})
yield _event("done", {})
def _event(event_type: str, data: dict) -> dict:
return {"event": event_type, "data": json.dumps(data)}
@app.get("/search")
async def search(request: Request, q: str = Query(..., min_length=1, max_length=500)):
async def generator():
async for evt in search_pipeline(q, request):
yield evt
return EventSourceResponse(
generator(),
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.get("/health")
async def health():
return {"status": "ok"}
Docker Deployment
# Dockerfile
FROM python:3.12-slim
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir .
# Install Playwright browser
RUN pip install playwright && playwright install chromium --with-deps
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
codexity:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
- ./.env:/app/.env
environment:
- MODEL_PATH=/app/models/qwen2.5-7b-instruct-q4_k_m.gguf
restart: unless-stopped
# .env.example
MODEL_PATH=./models/qwen2.5-7b-instruct-q4_k_m.gguf
CONTEXT_LENGTH=8192
MAX_TOKENS=2048
MAX_SEARCH_RESULTS=8
MAX_QUERIES=3
SCRAPE_TIMEOUT=15
MAX_CONCURRENT_SCRAPES=5
USE_PLAYWRIGHT=true
CHUNK_SIZE=512
CHUNK_OVERLAP=50
TOP_K_CHUNKS=10
LOG_LEVEL=info
Running It
Local Development
# Download a model
mkdir -p models
wget -O models/qwen2.5-7b-instruct-q4_k_m.gguf \
"https://huggingface.co/Qwen/Qwen2.5-7B-Instruct-GGUF/resolve/main/qwen2.5-7b-instruct-q4_k_m.gguf"
# Install dependencies
pip install -e .
playwright install chromium
# Copy env
cp .env.example .env
# Run
uvicorn main:app --reload --host 0.0.0.0 --port 8000
Docker
docker compose up --build
Testing
# Simple query
curl -N "http://localhost:8000/search?q=what+is+fastapi"
# Complex comparison
curl -N "http://localhost:8000/search?q=postgresql+vs+mongodb+for+startups"
# Health check
curl http://localhost:8000/health
End-to-End Trace
Here is what happens when you run curl -N "http://localhost:8000/search?q=what+is+asyncio":
event: status
data: {"step": "rewriting_query"}
event: status
data: {"step": "queries_ready", "queries": ["Python asyncio tutorial 2026", "asyncio event loop explained", "Python async await concurrency"]}
event: status
data: {"step": "searching"}
event: sources_preview
data: {"count": 16}
event: status
data: {"step": "scraping"}
event: status
data: {"step": "scraping_done", "scraped": 11, "total": 16}
event: status
data: {"step": "processing"}
event: sources
data: {"sources": [{"index": 1, "title": "Python asyncio Documentation", "url": "https://docs.python.org/3/library/asyncio.html"}, ...]}
event: status
data: {"step": "generating"}
event: token
data: {"text": "Python"}
event: token
data: {"text": "'s"}
event: token
data: {"text": " asyncio"}
... (hundreds of token events)
event: token
data: {"text": " [1][3]."}
event: done
data: {}
Total time: 7-15 seconds on CPU, 3-6 seconds with GPU offload. The user sees the first token within 5-10 seconds.
Performance Tuning
Latency breakdown for a typical query:
| Phase | Time |
|---|---|
| Query rewriting | 100-300ms |
| Web search (parallel) | 1-2s |
| Scraping (5 concurrent) | 3-8s |
| Content processing | 50-200ms |
| Answer generation (CPU) | 3-10s |
| Total | 7-20s |
The two biggest levers:
GPU offload. Set n_gpu_layers=-1 in llm_client.py. Answer generation drops from 3-10s to 0.5-2s.
Skip Playwright. Set USE_PLAYWRIGHT=false. Most technical content is server-rendered. You lose access to SPA-only sites but cut scraping time significantly.
What You Built
A backend that takes a natural language question and returns a cited, streaming answer sourced from the live web. Nine Python files. No paid APIs. Runs on hardware you own.
The architecture is the same one Perplexity, You.com, and Phind use. The models are smaller. The infrastructure is simpler. But the pipeline, query rewriting, web search, scraping, content processing, LLM synthesis, SSE streaming, is identical.
Fork it. Extend it. Add a frontend. Swap in a bigger model. Point it at different search engines. The components are modular and the contracts between them are typed.
The source speaks for itself.
Related Content
Codexity Part 1: Architecture of an Answer Engine
The first chapter in a series on building a Perplexity-style answer engine from scratch in Python. We lay out the full architecture, set up the project skeleton, and understand every component before writing a single line of business logic.
Codexity Part 7: Server-Sent Events and Streaming
Implement production-grade SSE streaming in FastAPI. Handle connection drops, heartbeats, backpressure, error recovery, and the HTTP details that make streaming reliable.
Codexity Part 5: Content Processing and Relevance Ranking
Take raw scraped text from 12 web pages and transform it into a focused context window for an LLM. Chunk text, score relevance with BM25, select the best fragments, and format them with source citations.