Lesson 189 of 2116
Build It: A Daily Data Pipeline With LLM Enrichment
Pull data from an API, clean it with pandas, ask Claude to enrich each row, save to SQLite. The pattern powers most data-engineering AI work.
Lesson map
What this lesson covers
Learning path
The main moves in order
- 1What we're building
- 2pipeline
- 3pandas
- 4SQLite
Concept cluster
Terms to connect while reading
Section 1
What we're building
A script that: fetches yesterday's top stories from an API, loads them into a DataFrame, asks Claude to classify each as positive/negative/neutral, and writes results to a SQLite database. Idempotent — rerun safely.
Setup: a simple SQLite table. The UNIQUE(id) is what makes reruns safe.
# pyproject.toml: httpx, pandas, anthropic, sqlalchemy
import asyncio
import httpx
import pandas as pd
from sqlalchemy import create_engine, text
from anthropic import AsyncAnthropic
DB_URL = "sqlite:///pipeline.db"
engine = create_engine(DB_URL)
def init_db():
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS stories (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
url TEXT,
fetched_at TEXT NOT NULL,
sentiment TEXT,
UNIQUE(id)
)
"""))Concurrent fetch of 20 stories into a DataFrame. pandas handles the shape.
async def fetch_stories() -> pd.DataFrame:
url = "https://hacker-news.firebaseio.com/v0/topstories.json"
async with httpx.AsyncClient(timeout=10) as client:
ids = (await client.get(url)).json()[:20]
async def get_one(sid):
r = await client.get(f"https://hacker-news.firebaseio.com/v0/item/{sid}.json")
return r.json()
raw = await asyncio.gather(*(get_one(i) for i in ids))
df = pd.DataFrame(raw)[["id", "title", "url"]].dropna(subset=["title"])
df["fetched_at"] = pd.Timestamp.utcnow().isoformat()
return dfHaiku is ~10x cheaper than Opus — perfect for bulk labeling. Semaphore caps concurrency.
client = AsyncAnthropic()
sem = asyncio.Semaphore(5)
async def classify(title: str) -> str:
async with sem:
try:
r = await client.messages.create(
model="claude-haiku-4-5", # cheaper for bulk classification
max_tokens=10,
messages=[{
"role": "user",
"content": f"Classify sentiment of this headline as exactly one word: positive, negative, or neutral. No other text.\n\nHeadline: {title}"
}],
)
word = r.content[0].text.strip().lower()
return word if word in {"positive", "negative", "neutral"} else "neutral"
except Exception as e:
print(f"classify failed: {e}")
return "unknown"
async def enrich(df: pd.DataFrame) -> pd.DataFrame:
sentiments = await asyncio.gather(*(classify(t) for t in df["title"]))
df = df.copy()
df["sentiment"] = sentiments
return dfUpsert with ON CONFLICT — the idempotency trick. You can rerun all day without creating duplicates.
def upsert(df: pd.DataFrame) -> int:
sql = text("""
INSERT INTO stories (id, title, url, fetched_at, sentiment)
VALUES (:id, :title, :url, :fetched_at, :sentiment)
ON CONFLICT(id) DO UPDATE SET
sentiment = excluded.sentiment,
fetched_at = excluded.fetched_at
""")
rows = df.to_dict(orient="records")
with engine.begin() as conn:
conn.execute(sql, rows)
return len(rows)
async def main():
init_db()
df = await fetch_stories()
df = await enrich(df)
n = upsert(df)
print(f"Inserted/updated {n} rows.")
# Read back top negatives
with engine.connect() as conn:
neg = pd.read_sql("SELECT title FROM stories WHERE sentiment = 'negative' LIMIT 5", conn)
print("\nRecent negative headlines:")
print(neg.to_string(index=False))
asyncio.run(main())Cost math
- Haiku 4.5: $1/M input, $5/M output tokens (2026 pricing)
- Each classify call: ~60 input, ~5 output tokens
- 20 stories ≈ 1300 tokens ≈ $0.001
- Running daily for a year ≈ $0.50. Free-tier budget.
Mini-exercise
- 1Add a 'topic' column — ask Claude to pick one of: tech, politics, science, other
- 2Add a scheduled run using GitHub Actions or cron
- 3Export a weekly sentiment breakdown chart using pandas + matplotlib
- 4Track cost: log tokens used per run into a costs table
Compare the options
| Sync pandas loop | Async concurrent LLM calls |
|---|---|
| 20 stories × 1s = 20s | ~2s with concurrency 5 |
| Simple code | Needs asyncio.gather |
| Good for: prototyping | Good for: anything bigger than 10 rows |
Key terms in this lesson
Big idea: a data pipeline is fetch → transform → load. LLM enrichment slots in as another transform. Make each step idempotent and you can cron it forever.
End-of-lesson quiz
Check what stuck
15 questions · Score saves to your progress.
Tutor
Curious about “Build It: A Daily Data Pipeline With LLM Enrichment”?
Ask anything about this lesson. I’ll answer using just what you’re reading — short, friendly, grounded.
Progress saved locally in this browser. Sign in to sync across devices.
Related lessons
Keep going
Creators · 50 min
The Landscape: Copilot vs. Cursor vs. Windsurf vs. Claude Code
The AI coding tool market fragmented fast. Let's map the 2026 landscape honestly: who is for autocomplete, who is for agents, who wins on cost, and what the tradeoffs actually feel like.
Creators · 50 min
Installing and Using Claude Code CLI
Claude Code is Anthropic's terminal-native coding agent. Let's install it, wire it to a project, and use the features most engineers miss on day one.
Creators · 45 min
Installing and Using the OpenAI Codex CLI
Codex CLI is OpenAI's terminal coding agent. It runs locally, supports MCP, and ships a codex cloud mode for background tasks. Let's install it and compare it honestly to Claude Code.
