A LangGraph create_react_agent that answers data-lineage / data-support /
ETL-bug questions against a tiny simulated warehouse, by combining two MCP
servers:
sqllite-mcp-server— SQL access to the warehousefilesystem-mcp-server— read raw upstream files on disk
You need both to find ETL bugs: querying the DB tells you "what's there"; reading the source file tells you "what was supposed to be there". The discrepancy is the bug.
Many upstream feeds (S3, server logs, several customer integrations) ship daily
files; ETL loaders import them into raw tables; an agg job rolls everything up
into daily_metrics. When a downstream metric moves or looks suspicious, the
on-call engineer asks the agent.
We plant two real ETL bugs for today (2026-04-26) so the agent has something to find:
-
Customer B currency-filter bug —
data/sources/customer_b/2026-04-26.csvhas 80 orders (75 EUR + 5 USD). The loader silently filters out non-USD rows, so the DB only has 5. ⇒daily_metrics.total_revenuedrops ~60% today. ⇒ Visible only by reading the source file and comparing row counts. -
Customer A precision bug —
data/sources/customer_a/2026-04-26.csvstores amounts as119.06,94.21, etc. The loader doesint(amount)instead offloat(amount), silently truncating cents. DB has119.0,94.0. ⇒ Subtle "file says 119.06, DB says 119" — the canonical question this POC is built to answer.
The other two sources (S3 clickstream, app logs) are clean — file row count matches DB row count. Useful as a control: the agent should not flag them.
| File | Purpose |
|---|---|
setup_warehouse.py |
Build data/warehouse.db + write upstream files for today. |
trace_agent.py |
LangGraph create_react_agent driver + REPL (2 MCP servers). |
tests/test_lineage_qa.py |
End-to-end pytest suite hitting the real LLM. |
After python3 setup_warehouse.py:
data/
├── warehouse.db
└── sources/
├── s3_clickstream/2026-04-26.json # 1000 NDJSON events (clean)
├── app_logs/2026-04-26.log # 500 log lines (clean)
├── customer_a/2026-04-26.csv # 51 rows, 99.99-style amounts (precision bug in loader)
└── customer_b/2026-04-26.csv # 80 rows, mixed USD/EUR (currency-filter bug in loader)
# 1) Deps
pip install langgraph langchain-openai langchain-mcp-adapters "mcp[cli]" faker pytest pytest-asyncio
# 2) Build the warehouse + write today's upstream files
python3 setup_warehouse.py
# 3) Configure an LLM endpoint (OpenRouter or OpenAI compatible)
export OPENROUTER_API_KEY=...
# optional:
# export LLM_BASE_URL=https://openrouter.ai/api/v1
# export LLM_MODEL=anthropic/claude-sonnet-4.5
# 4) REPL
python3 trace_agent.py
# 5) Run the e2e tests (slow, ~2 min, hits real LLM)
python3 -m pytest -s- Today's total_revenue is much lower than last month — by how much, and what's the root cause? Check the upstream raw source file too.
- For today's customer_a_orders_raw rows, do the DB amounts match the upstream source file exactly? If they differ, name the loader and explain the bug.
- Where does daily_metrics.total_events come from upstream?
user question
│
▼
react agent ──► sqlite-mcp::execute_query _field_lineage / _source_registry / raw tables
│
├──────────► filesystem-mcp::read_file data/sources/<source>/<date>.<ext>
│
▼
compare DB rows to file rows → name the loader → answer
Lineage metadata lives in two SQL tables inside warehouse.db:
_field_lineage(target_table, target_field, source_table, source_field, transform, etl_job)_source_registry(source_table, source_uri, file_dir, loader, schema_note)
The agent reads _source_registry to learn where each raw source's files
live on disk, then uses the filesystem MCP to read them.