Loading lesson…
Pull data from an API, clean it with pandas, ask Claude to enrich each row, save to SQLite. The pattern powers most data-engineering AI work.
A script that: fetches yesterday's top stories from an API, loads them into a DataFrame, asks Claude to classify each as positive/negative/neutral, and writes results to a SQLite database. Idempotent — rerun safely.
# pyproject.toml: httpx, pandas, anthropic, sqlalchemy
import asyncio
import httpx
import pandas as pd
from sqlalchemy import create_engine, text
from anthropic import AsyncAnthropic
DB_URL = "sqlite:///pipeline.db"
engine = create_engine(DB_URL)
def init_db():
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS stories (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
url TEXT,
fetched_at TEXT NOT NULL,
sentiment TEXT,
UNIQUE(id)
)
"""))Setup: a simple SQLite table. The UNIQUE(id) is what makes reruns safe.async def fetch_stories() -> pd.DataFrame:
url = "https://hacker-news.firebaseio.com/v0/topstories.json"
async with httpx.AsyncClient(timeout=10) as client:
ids = (await client.get(url)).json()[:20]
async def get_one(sid):
r = await client.get(f"https://hacker-news.firebaseio.com/v0/item/{sid}.json")
return r.json()
raw = await asyncio.gather(*(get_one(i) for i in ids))
df = pd.DataFrame(raw)[["id", "title", "url"]].dropna(subset=["title"])
df["fetched_at"] = pd.Timestamp.utcnow().isoformat()
return dfConcurrent fetch of 20 stories into a DataFrame. pandas handles the shape.client = AsyncAnthropic()
sem = asyncio.Semaphore(5)
async def classify(title: str) -> str:
async with sem:
try:
r = await client.messages.create(
model="claude-haiku-4-5", # cheaper for bulk classification
max_tokens=10,
messages=[{
"role": "user",
"content": f"Classify sentiment of this headline as exactly one word: positive, negative, or neutral. No other text.\n\nHeadline: {title}"
}],
)
word = r.content[0].text.strip().lower()
return word if word in {"positive", "negative", "neutral"} else "neutral"
except Exception as e:
print(f"classify failed: {e}")
return "unknown"
async def enrich(df: pd.DataFrame) -> pd.DataFrame:
sentiments = await asyncio.gather(*(classify(t) for t in df["title"]))
df = df.copy()
df["sentiment"] = sentiments
return dfHaiku is ~10x cheaper than Opus — perfect for bulk labeling. Semaphore caps concurrency.def upsert(df: pd.DataFrame) -> int:
sql = text("""
INSERT INTO stories (id, title, url, fetched_at, sentiment)
VALUES (:id, :title, :url, :fetched_at, :sentiment)
ON CONFLICT(id) DO UPDATE SET
sentiment = excluded.sentiment,
fetched_at = excluded.fetched_at
""")
rows = df.to_dict(orient="records")
with engine.begin() as conn:
conn.execute(sql, rows)
return len(rows)
async def main():
init_db()
df = await fetch_stories()
df = await enrich(df)
n = upsert(df)
print(f"Inserted/updated {n} rows.")
# Read back top negatives
with engine.connect() as conn:
neg = pd.read_sql("SELECT title FROM stories WHERE sentiment = 'negative' LIMIT 5", conn)
print("\nRecent negative headlines:")
print(neg.to_string(index=False))
asyncio.run(main())Upsert with ON CONFLICT — the idempotency trick. You can rerun all day without creating duplicates.| Sync pandas loop | Async concurrent LLM calls |
|---|---|
| 20 stories × 1s = 20s | ~2s with concurrency 5 |
| Simple code | Needs asyncio.gather |
| Good for: prototyping | Good for: anything bigger than 10 rows |
Big idea: a data pipeline is fetch → transform → load. LLM enrichment slots in as another transform. Make each step idempotent and you can cron it forever.
15 questions · take it digitally for instant feedback at tendril.neural-forge.io/learn/quiz/end-prog-python-data-pipeline-creators
What is the core idea behind "Build It: A Daily Data Pipeline With LLM Enrichment"?
Which term best describes a foundational idea in "Build It: A Daily Data Pipeline With LLM Enrichment"?
A learner studying Build It: A Daily Data Pipeline With LLM Enrichment would need to understand which concept?
Which of these is directly relevant to Build It: A Daily Data Pipeline With LLM Enrichment?
Which of the following is a key point about Build It: A Daily Data Pipeline With LLM Enrichment?
Which of these does NOT belong in a discussion of Build It: A Daily Data Pipeline With LLM Enrichment?
Which statement is accurate regarding Build It: A Daily Data Pipeline With LLM Enrichment?
Which of these does NOT belong in a discussion of Build It: A Daily Data Pipeline With LLM Enrichment?
What is the key insight about "Idempotency is the hallmark of good pipelines" in the context of Build It: A Daily Data Pipeline With LLM Enrichment?
What is the recommended tip about "Always review AI output" in the context of Build It: A Daily Data Pipeline With LLM Enrichment?
Which statement accurately describes an aspect of Build It: A Daily Data Pipeline With LLM Enrichment?
What does working with Build It: A Daily Data Pipeline With LLM Enrichment typically involve?
Which best describes the scope of "Build It: A Daily Data Pipeline With LLM Enrichment"?
Which section heading best belongs in a lesson about Build It: A Daily Data Pipeline With LLM Enrichment?
Which section heading best belongs in a lesson about Build It: A Daily Data Pipeline With LLM Enrichment?