Lesson 165 of 1596
Build It: A Daily Data Pipeline With LLM Enrichment
Pull data from an API, clean it with pandas, ask Claude to enrich each row, save to SQLite. The pattern powers most data-engineering AI work.
Creators · AI-Assisted Coding · ~42 min read
What we're building
A script that: fetches yesterday's top stories from an API, loads them into a DataFrame, asks Claude to classify each as positive/negative/neutral, and writes results to a SQLite database. Idempotent — rerun safely.
Setup: a simple SQLite table. The UNIQUE(id) is what makes reruns safe.
# pyproject.toml: httpx, pandas, anthropic, sqlalchemy import asyncio import httpx import pandas as pd from sqlalchemy import create_engine, text from anthropic import AsyncAnthropic DB_URL = "sqlite:///pipeline.db" engine = create_engine(DB_URL) def init_db(): with engine.begin() as conn: conn.execute(text(""" CREATE TABLE IF NOT EXISTS stories ( id INTEGER PRIMARY KEY, title TEXT NOT NULL, url TEXT, fetched_at TEXT NOT NULL, sentiment TEXT, UNIQUE(id) ) """))Concurrent fetch of 20 stories into a DataFrame. pandas handles the shape.
async def fetch_stories() -> pd.DataFrame: url = "https://hacker-news.firebaseio.com/v0/topstories.json" async with httpx.AsyncClient(timeout=10) as client: ids = (await client.get(url)).json()[:20] async def get_one(sid): r = await client.get(f"https://hacker-news.firebaseio.com/v0/item/{sid}.json") return r.json() raw = await asyncio.gather(*(get_one(i) for i in ids)) df = pd.DataFrame(raw)[["id", "title", "url"]].dropna(subset=["title"]) df["fetched_at"] = pd.Timestamp.utcnow().isoformat() return dfHaiku is ~10x cheaper than Opus — perfect for bulk labeling. Semaphore caps concurrency.
client = AsyncAnthropic() sem = asyncio.Semaphore(5) async def classify(title: str) -> str: async with sem: try: r = await client.messages.create( model="claude-haiku-4-5", # cheaper for bulk classification max_tokens=10, messages=[{ "role": "user", "content": f"Classify sentiment of this headline as exactly one word: positive, negative, or neutral. No other text.\n\nHeadline: {title}" }], ) word = r.content[0].text.strip().lower() return word if word in {"positive", "negative", "neutral"} else "neutral" except Exception as e: print(f"classify failed: {e}") return "unknown" async def enrich(df: pd.DataFrame) -> pd.DataFrame: sentiments = await asyncio.gather(*(classify(t) for t in df["title"])) df = df.copy() df["sentiment"] = sentiments return dfUpsert with ON CONFLICT — the idempotency trick. You can rerun all day without creating duplicates.
def upsert(df: pd.DataFrame) -> int: sql = text(""" INSERT INTO stories (id, title, url, fetched_at, sentiment) VALUES (:id, :title, :url, :fetched_at, :sentiment) ON CONFLICT(id) DO UPDATE SET sentiment = excluded.sentiment, fetched_at = excluded.fetched_at """) rows = df.to_dict(orient="records") with engine.begin() as conn: conn.execute(sql, rows) return len(rows) async def main(): init_db() df = await fetch_stories() df = await enrich(df) n = upsert(df) print(f"Inserted/updated {n} rows.") # Read back top negatives with engine.connect() as conn: neg = pd.read_sql("SELECT title FROM stories WHERE sentiment = 'negative' LIMIT 5", conn) print("\nRecent negative headlines:") print(neg.to_string(index=False)) asyncio.run(main())Cost math
- Haiku 4.5: $1/M input, $5/M output tokens (2026 pricing)
- Each classify call: ~60 input, ~5 output tokens
- 20 stories ≈ 1300 tokens ≈ $0.001
- Running daily for a year ≈ $0.50. Free-tier budget.
Mini-exercise
- 1Add a 'topic' column — ask Claude to pick one of: tech, politics, science, other
- 2Add a scheduled run using GitHub Actions or cron
- 3Export a weekly sentiment breakdown chart using pandas + matplotlib
- 4Track cost: log tokens used per run into a costs table
Compare the options
| Sync pandas loop | Async concurrent LLM calls |
|---|---|
| 20 stories × 1s = 20s | ~2s with concurrency 5 |
| Simple code | Needs asyncio.gather |
| Good for: prototyping | Good for: anything bigger than 10 rows |
Key terms in this lesson
Big idea: a data pipeline is fetch → transform → load. LLM enrichment slots in as another transform. Make each step idempotent and you can cron it forever.
End-of-lesson quiz
Check what stuck
8 questions · Score saves to your progress.
Tutor
Curious about “Build It: A Daily Data Pipeline With LLM Enrichment”?
Ask anything about this lesson. I’ll answer using just what you’re reading — short, friendly, grounded.
Progress saved locally in this browser. Sign in to sync across devices.
Related lessons
Keep going
Creators · 50 min
Installing and Using Claude Code CLI
Claude Code is Anthropic's terminal-native coding agent. Let's install it, wire it to a project, and use the features most engineers miss on day one.
Creators · 45 min
Installing and Using the OpenAI Codex CLI
Codex CLI is OpenAI's terminal coding agent. It runs locally, supports MCP, and ships a codex cloud mode for background tasks. Let's install it and compare it honestly to Claude Code.
Creators · 40 min
Agents vs. Autocomplete — the Mental Model Shift
Autocomplete is a suggestion. An agent is an actor. The mental model you bring to each is different, and conflating them is the number-one reason teams trip over AI coding.
