Loading lesson…
Pull data from an API, clean it with pandas, ask Claude to enrich each row, save to SQLite. The pattern powers most data-engineering AI work.
A script that: fetches yesterday's top stories from an API, loads them into a DataFrame, asks Claude to classify each as positive/negative/neutral, and writes results to a SQLite database. Idempotent — rerun safely.
# pyproject.toml: httpx, pandas, anthropic, sqlalchemy import asyncio import httpx import pandas as pd from sqlalchemy import create_engine, text from anthropic import AsyncAnthropic DB_URL = "sqlite:///pipeline.db" engine = create_engine(DB_URL) def init_db(): with engine.begin() as conn: conn.execute(text(""" CREATE TABLE IF NOT EXISTS stories ( id INTEGER PRIMARY KEY, title TEXT NOT NULL, url TEXT, fetched_at TEXT NOT NULL, sentiment TEXT, UNIQUE(id) ) """))Setup: a simple SQLite table. The UNIQUE(id) is what makes reruns safe.async def fetch_stories() -> pd.DataFrame: url = "https://hacker-news.firebaseio.com/v0/topstories.json" async with httpx.AsyncClient(timeout=10) as client: ids = (await client.get(url)).json()[:20] async def get_one(sid): r = await client.get(f"https://hacker-news.firebaseio.com/v0/item/{sid}.json") return r.json() raw = await asyncio.gather(*(get_one(i) for i in ids)) df = pd.DataFrame(raw)[["id", "title", "url"]].dropna(subset=["title"]) df["fetched_at"] = pd.Timestamp.utcnow().isoformat() return dfConcurrent fetch of 20 stories into a DataFrame. pandas handles the shape.client = AsyncAnthropic() sem = asyncio.Semaphore(5) async def classify(title: str) -> str: async with sem: try: r = await client.messages.create( model="claude-haiku-4-5", # cheaper for bulk classification max_tokens=10, messages=[{ "role": "user", "content": f"Classify sentiment of this headline as exactly one word: positive, negative, or neutral. No other text.\n\nHeadline: {title}" }], ) word = r.content[0].text.strip().lower() return word if word in {"positive", "negative", "neutral"} else "neutral" except Exception as e: print(f"classify failed: {e}") return "unknown" async def enrich(df: pd.DataFrame) -> pd.DataFrame: sentiments = await asyncio.gather(*(classify(t) for t in df["title"])) df = df.copy() df["sentiment"] = sentiments return dfHaiku is ~10x cheaper than Opus — perfect for bulk labeling. Semaphore caps concurrency.def upsert(df: pd.DataFrame) -> int: sql = text(""" INSERT INTO stories (id, title, url, fetched_at, sentiment) VALUES (:id, :title, :url, :fetched_at, :sentiment) ON CONFLICT(id) DO UPDATE SET sentiment = excluded.sentiment, fetched_at = excluded.fetched_at """) rows = df.to_dict(orient="records") with engine.begin() as conn: conn.execute(sql, rows) return len(rows) async def main(): init_db() df = await fetch_stories() df = await enrich(df) n = upsert(df) print(f"Inserted/updated {n} rows.") # Read back top negatives with engine.connect() as conn: neg = pd.read_sql("SELECT title FROM stories WHERE sentiment = 'negative' LIMIT 5", conn) print("\nRecent negative headlines:") print(neg.to_string(index=False)) asyncio.run(main())Upsert with ON CONFLICT — the idempotency trick. You can rerun all day without creating duplicates.| Sync pandas loop | Async concurrent LLM calls |
|---|---|
| 20 stories × 1s = 20s | ~2s with concurrency 5 |
| Simple code | Needs asyncio.gather |
| Good for: prototyping | Good for: anything bigger than 10 rows |
Big idea: a data pipeline is fetch → transform → load. LLM enrichment slots in as another transform. Make each step idempotent and you can cron it forever.
8 questions · take it digitally for instant feedback at tendril.neural-forge.io/learn/quiz/end-prog-python-data-pipeline-creators
What is the main idea of "Build It: A Daily Data Pipeline With LLM Enrichment"?
Which concept is most central to "Build It: A Daily Data Pipeline With LLM Enrichment"?
Which use of AI fits this topic best?
What should a careful learner remember about "Idempotency is the hallmark of good pipelines"?
You want to use AI after this lesson. What is the safest next step?
How should AI output about pipeline be treated?
Name one way to verify an AI answer about pipeline.
Which action would help you apply "Build It: A Daily Data Pipeline With LLM Enrichment" responsibly?