Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lab-04-data-pipeline/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Archivos generados por el pipeline — no commitear
data/processed/

# Incluir explícitamente los datos de ejemplo GTFS
# (el .gitignore raíz excluye *.txt en data/)
!data/sample_gtfs/*.txt

# Python
__pycache__/
*.pyc
*.pyo
.pytest_cache/

# Jupyter checkpoints
.ipynb_checkpoints/

# Entornos virtuales
.venv/
venv/
21 changes: 21 additions & 0 deletions lab-04-data-pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM python:3.12-slim

COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

ENV UV_SYSTEM_PYTHON=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1

WORKDIR /app

RUN uv pip install --system \
"prefect>=3.0" \
"polars>=1.0" \
"pyarrow>=16.0" \
"psycopg2-binary>=2.9" \
"python-dotenv>=1.0" \
"pytest>=8.0" \
"pytest-asyncio>=0.23" \
"jupyterlab>=4.0"

COPY . .
183 changes: 183 additions & 0 deletions lab-04-data-pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Lab 04 — Data Pipeline: Prefect · TimescaleDB · Polars · GTFS

Pipeline ETL completo para datos de transporte público en formato GTFS.
Ingesta, transforma y almacena datos de paradas, rutas y eventos de vehículos
usando orquestación con Prefect y series temporales con TimescaleDB.


---

## Stack

| Tecnología | Rol |
|-----------------|-------------------------------------------------------------|
| **Prefect 3** | Orquestación de flows: dependencias, reintentos, UI |
| **TimescaleDB** | PostgreSQL + hypertables para datos de series temporales |
| **Polars** | Transformaciones de DataFrames, lectura de GTFS y Parquet |
| **GTFS** | Formato estándar de datos de transporte público |
| **Jupyter** | Análisis exploratorio interactivo |

---

## Conceptos

### Prefect vs Celery
Celery es una cola de tareas **reactiva**: ejecuta una tarea cuando llega un mensaje.
Prefect es un **orquestador de workflows**: define grafos de tareas con dependencias,
reintentos con backoff, observabilidad centralizada y UI para monitorear runs.

```
Celery → "cuando llegue un mensaje MQTT, procesar esta posición"
Prefect → "cada hora: descargar el feed GTFS → validar → transformar → cargar a la BD"
```

### TimescaleDB vs PostgreSQL
TimescaleDB **es** PostgreSQL: mismo driver (`psycopg2`), mismas queries SQL.
Agrega dos primitivos clave:

- **Hypertable**: tabla particionada automáticamente por tiempo. Una query
`WHERE timestamp > NOW() - INTERVAL '1 hour'` solo toca la partición relevante.
- **`time_bucket()`**: agrupación por intervalos de tiempo, equivalente al
`DATE_TRUNC` de SQL estándar pero más flexible.

```sql
-- Velocidad promedio por vehículo en ventanas de 5 minutos
SELECT time_bucket('5 minutes', recorded_at) AS bucket,
vehicle_id,
AVG(speed_kmh) AS avg_speed
FROM vehicle_events
WHERE recorded_at > NOW() - INTERVAL '1 hour'
GROUP BY bucket, vehicle_id
ORDER BY bucket;
```

### Polars vs Pandas
Polars usa Apache Arrow internamente. Sus ventajas en un pipeline ETL:

| Característica | Polars | Pandas |
|-------------------|-------------------------------|-------------------------|
| Motor | Rust + Apache Arrow | NumPy / Python |
| Evaluación | Lazy (optimiza el plan) | Eager (ejecuta al vuelo)|
| Lectura Parquet | Cero copias | Copia en memoria |
| Velocidad típica | 5–20× más rápido | Baseline |

### GTFS (General Transit Feed Specification)
Formato estándar para datos de transporte público (Google Maps, OSM, SIMOVI).
Consiste en archivos `.txt` (CSV) con nombres fijos:

| Archivo | Contenido |
|------------------|--------------------------------------------------|
| `stops.txt` | Paradas: id, nombre, latitud, longitud |
| `routes.txt` | Rutas: id, nombre corto, nombre largo, tipo |
| `trips.txt` | Viajes: ruta, servicio, destino |
| `stop_times.txt` | Horarios: llegada y salida por parada |
| `calendar.txt` | Calendario de servicios: días activos |

---

## Estructura

```
lab-04-data-pipeline/
├── Dockerfile
├── docker-compose.yml
├── pyproject.toml
├── data/
│ └── sample_gtfs/ # Feed GTFS de ejemplo (buses San José, CR)
│ ├── stops.txt
│ ├── routes.txt
│ ├── trips.txt
│ ├── stop_times.txt
│ └── calendar.txt
├── models/
│ └── timescale_schema.sql # DDL: tablas normales + hypertables
├── flows/
│ ├── ingest_gtfs.py # Flow: GTFS → validar → cargar a TimescaleDB
│ ├── transform.py # Flow: Polars → estadísticas → Parquet
│ └── analyze.py # Flow: queries con time_bucket()
├── notebooks/
│ ├── explore_data.ipynb # Análisis descriptivo: GTFS + series temporales
│ └── ml_demand.ipynb # Predicción de demanda con rolling mean (Polars)
└── tests/
├── conftest.py # Fixtures: schema, aislamiento de tablas
└── test_flows.py # 17 tests de integración (TimescaleDB real)
```

---

## Servicios Docker

| Servicio | Puerto host | Descripción |
|-------------------|-------------|-------------------------------------------|
| `db` | 5433 | TimescaleDB (pg 16) — 5433 evita colisión con otros labs |
| `prefect-server` | 4200 | UI y API de Prefect |
| `prefect-worker` | — | Ejecuta los flows en un work pool |
| `app` | 8888 | Flows ad-hoc + JupyterLab |

---

## Inicio rápido

```bash
# 1. Levantar servicios
docker compose up -d

# 2. Correr el pipeline completo (el schema se aplica automáticamente en ingest)
docker compose exec app python flows/ingest_gtfs.py
docker compose exec app python flows/transform.py
docker compose exec app python flows/analyze.py

# 3. Tests
docker compose exec app pytest tests/ -v

# 4. UI de Prefect — ver runs, tasks y logs
# http://localhost:4200
```

---

## Notebooks de análisis

Los notebooks **no son parte del pipeline productivo** — son herramientas de
exploración interactiva que se usan después de que el pipeline ya cargó los datos.

| Notebook | Contenido |
|---------------------|---------------------------------------------------------------|
| `explore_data.ipynb`| Resumen del feed GTFS, estadísticas por ruta, series temporales con `time_bucket()` |
| `ml_demand.ipynb` | Feature engineering con Polars (lags, rolling mean), predicción baseline de abordajes |

Para abrirlos, lanzar JupyterLab desde el contenedor `app`:

```bash
docker compose exec app jupyter lab \
--ip=0.0.0.0 --port=8888 --no-browser --allow-root \
--notebook-dir=/app/notebooks
```

Luego abrir la URL con token que aparece en la consola (ej: `http://127.0.0.1:8888/lab?token=...`).

> **Nota**: ejecutar los tres flows antes de abrir los notebooks.
> Las celdas de series temporales (secciones 5–7 de `explore_data`) requieren
> datos en `vehicle_events` y `stop_ridership`, que genera `flows/analyze.py`.

---

## Hypertables creadas

| Tabla | Tipo | Columna de tiempo | Descripción |
|--------------------|-------------|--------------------|------------------------------------|
| `stops` | Normal | — | Paradas del feed GTFS |
| `routes` | Normal | — | Rutas del feed GTFS |
| `trips` | Normal | — | Viajes del feed GTFS |
| `vehicle_events` | Hypertable | `recorded_at` | Posición + velocidad por vehículo |
| `stop_ridership` | Hypertable | `recorded_at` | Pasajeros por parada por intervalo |

---

## Qué demuestra este laboratorio

- **Orquestación de pipelines** con Prefect 3: flows, tasks, reintentos automáticos y UI de observabilidad
- **Series temporales** con TimescaleDB: creación de hypertables, particionado automático por tiempo y uso de `time_bucket()` para agregaciones eficientes
- **Transformaciones de alto rendimiento** con Polars: lectura de archivos GTFS (CSV), limpieza, joins y exportación a Parquet
- **Modelado de datos de transporte**: ingesta de un feed GTFS completo (paradas, rutas, viajes, horarios) en una base de datos relacional
- **Pipeline ETL reproducible**: cada etapa (ingest → transform → analyze) es un flow independiente que puede ejecutarse y monitorearse por separado
Empty file.
4 changes: 4 additions & 0 deletions lab-04-data-pipeline/data/sample_gtfs/calendar.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
service_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date
WEEKDAY,1,1,1,1,1,0,0,20260101,20261231
WEEKEND,0,0,0,0,0,1,1,20260101,20261231
DAILY,1,1,1,1,1,1,1,20260101,20261231
6 changes: 6 additions & 0 deletions lab-04-data-pipeline/data/sample_gtfs/routes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
route_id,route_short_name,route_long_name,route_type,route_color,route_text_color
R01,102,San José - Escazú vía La Sabana,3,E8000A,FFFFFF
R02,328,San José - San Pedro vía UCR,3,0058A8,FFFFFF
R03,401,San José - Desamparados,3,00A651,FFFFFF
R04,310,San José - Cartago,3,6F1D77,FFFFFF
R05,200,San José - Alajuela,3,F5A623,000000
125 changes: 125 additions & 0 deletions lab-04-data-pipeline/data/sample_gtfs/stop_times.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
trip_id,stop_id,stop_sequence,arrival_time,departure_time
T01_AM1,S001,1,06:00:00,06:00:00
T01_AM1,S004,2,06:05:00,06:05:00
T01_AM1,S006,3,06:15:00,06:15:00
T01_AM1,S007,4,06:20:00,06:20:00
T01_AM1,S009,5,06:35:00,06:35:00
T01_AM1,S011,6,06:50:00,06:50:00
T01_AM2,S011,1,07:10:00,07:10:00
T01_AM2,S009,2,07:25:00,07:25:00
T01_AM2,S007,3,07:40:00,07:40:00
T01_AM2,S006,4,07:45:00,07:45:00
T01_AM2,S004,5,07:55:00,07:55:00
T01_AM2,S001,6,08:00:00,08:00:00
T01_MD1,S001,1,12:00:00,12:00:00
T01_MD1,S004,2,12:05:00,12:05:00
T01_MD1,S006,3,12:15:00,12:15:00
T01_MD1,S007,4,12:20:00,12:20:00
T01_MD1,S009,5,12:35:00,12:35:00
T01_MD1,S011,6,12:50:00,12:50:00
T01_MD2,S011,1,13:10:00,13:10:00
T01_MD2,S009,2,13:25:00,13:25:00
T01_MD2,S007,3,13:40:00,13:40:00
T01_MD2,S006,4,13:45:00,13:45:00
T01_MD2,S004,5,13:55:00,13:55:00
T01_MD2,S001,6,14:00:00,14:00:00
T01_PM1,S001,1,17:00:00,17:00:00
T01_PM1,S004,2,17:08:00,17:08:00
T01_PM1,S006,3,17:22:00,17:22:00
T01_PM1,S007,4,17:28:00,17:28:00
T01_PM1,S009,5,17:48:00,17:48:00
T01_PM1,S011,6,18:05:00,18:05:00
T01_WE1,S001,1,09:00:00,09:00:00
T01_WE1,S004,2,09:05:00,09:05:00
T01_WE1,S006,3,09:15:00,09:15:00
T01_WE1,S007,4,09:20:00,09:20:00
T01_WE1,S009,5,09:32:00,09:32:00
T01_WE1,S011,6,09:45:00,09:45:00
T01_WE2,S011,1,10:05:00,10:05:00
T01_WE2,S009,2,10:18:00,10:18:00
T01_WE2,S007,3,10:30:00,10:30:00
T01_WE2,S006,4,10:35:00,10:35:00
T01_WE2,S004,5,10:45:00,10:45:00
T01_WE2,S001,6,10:50:00,10:50:00
T02_AM1,S001,1,06:15:00,06:15:00
T02_AM1,S002,2,06:20:00,06:20:00
T02_AM1,S023,3,06:28:00,06:28:00
T02_AM1,S013,4,06:42:00,06:42:00
T02_AM1,S014,5,06:55:00,06:55:00
T02_AM2,S014,1,07:15:00,07:15:00
T02_AM2,S013,2,07:28:00,07:28:00
T02_AM2,S023,3,07:42:00,07:42:00
T02_AM2,S002,4,07:50:00,07:50:00
T02_AM2,S001,5,07:55:00,07:55:00
T02_MD1,S001,1,12:15:00,12:15:00
T02_MD1,S002,2,12:20:00,12:20:00
T02_MD1,S023,3,12:28:00,12:28:00
T02_MD1,S013,4,12:42:00,12:42:00
T02_MD1,S014,5,12:55:00,12:55:00
T02_PM1,S001,1,17:15:00,17:15:00
T02_PM1,S002,2,17:22:00,17:22:00
T02_PM1,S023,3,17:32:00,17:32:00
T02_PM1,S013,4,17:50:00,17:50:00
T02_PM1,S014,5,18:05:00,18:05:00
T02_WE1,S001,1,09:15:00,09:15:00
T02_WE1,S002,2,09:20:00,09:20:00
T02_WE1,S023,3,09:28:00,09:28:00
T02_WE1,S013,4,09:40:00,09:40:00
T02_WE1,S014,5,09:52:00,09:52:00
T03_AM1,S001,1,06:30:00,06:30:00
T03_AM1,S003,2,06:35:00,06:35:00
T03_AM1,S014,3,06:55:00,06:55:00
T03_AM1,S016,4,07:05:00,07:05:00
T03_AM1,S017,5,07:20:00,07:20:00
T03_AM2,S017,1,07:40:00,07:40:00
T03_AM2,S016,2,07:55:00,07:55:00
T03_AM2,S014,3,08:05:00,08:05:00
T03_AM2,S003,4,08:25:00,08:25:00
T03_AM2,S001,5,08:30:00,08:30:00
T03_MD1,S001,1,12:30:00,12:30:00
T03_MD1,S003,2,12:35:00,12:35:00
T03_MD1,S014,3,12:55:00,12:55:00
T03_MD1,S016,4,13:05:00,13:05:00
T03_MD1,S017,5,13:20:00,13:20:00
T03_PM1,S001,1,17:30:00,17:30:00
T03_PM1,S003,2,17:37:00,17:37:00
T03_PM1,S014,3,18:00:00,18:00:00
T03_PM1,S016,4,18:12:00,18:12:00
T03_PM1,S017,5,18:28:00,18:28:00
T03_WE1,S001,1,09:30:00,09:30:00
T03_WE1,S003,2,09:35:00,09:35:00
T03_WE1,S014,3,09:52:00,09:52:00
T03_WE1,S016,4,10:02:00,10:02:00
T03_WE1,S017,5,10:15:00,10:15:00
T04_AM1,S001,1,05:30:00,05:30:00
T04_AM1,S018,2,05:58:00,05:58:00
T04_AM1,S019,3,06:15:00,06:15:00
T04_AM1,S020,4,06:30:00,06:30:00
T04_AM2,S020,1,06:45:00,06:45:00
T04_AM2,S019,2,07:00:00,07:00:00
T04_AM2,S018,3,07:18:00,07:18:00
T04_AM2,S001,5,07:48:00,07:48:00
T04_MD1,S001,1,12:00:00,12:00:00
T04_MD1,S018,2,12:28:00,12:28:00
T04_MD1,S019,3,12:45:00,12:45:00
T04_MD1,S020,4,13:00:00,13:00:00
T04_PM1,S001,1,17:00:00,17:00:00
T04_PM1,S018,2,17:35:00,17:35:00
T04_PM1,S019,3,17:55:00,17:55:00
T04_PM1,S020,4,18:15:00,18:15:00
T05_AM1,S001,1,05:45:00,05:45:00
T05_AM1,S022,2,05:52:00,05:52:00
T05_AM1,S012,3,06:00:00,06:00:00
T05_AM1,S021,4,06:30:00,06:30:00
T05_AM2,S021,1,07:00:00,07:00:00
T05_AM2,S012,2,07:30:00,07:30:00
T05_AM2,S022,3,07:38:00,07:38:00
T05_AM2,S001,4,07:45:00,07:45:00
T05_MD1,S001,1,12:00:00,12:00:00
T05_MD1,S022,2,12:07:00,12:07:00
T05_MD1,S012,3,12:15:00,12:15:00
T05_MD1,S021,4,12:45:00,12:45:00
T05_PM1,S001,1,17:00:00,17:00:00
T05_PM1,S022,2,17:10:00,17:10:00
T05_PM1,S012,3,17:20:00,17:20:00
T05_PM1,S021,4,17:55:00,17:55:00
24 changes: 24 additions & 0 deletions lab-04-data-pipeline/data/sample_gtfs/stops.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
stop_id,stop_name,stop_lat,stop_lon,zone_id,wheelchair_boarding
S001,Terminal 7-10 (San José),9.93370,-84.08000,Z1,1
S002,Mercado Central,9.93070,-84.07950,Z1,1
S003,Plaza de la Cultura,9.93180,-84.07800,Z1,1
S004,Hospital San Juan de Dios,9.93120,-84.08550,Z1,1
S005,Sabana Norte,9.94190,-84.10190,Z1,0
S006,Parque La Sabana,9.93870,-84.10880,Z1,1
S007,Estadio Nacional,9.93550,-84.11220,Z2,0
S008,Pavas Centro,9.93950,-84.13680,Z2,0
S009,Escazú Centro,9.91930,-84.13670,Z2,1
S010,Guachipelín,9.91740,-84.15720,Z2,0
S011,Santa Ana Centro,9.92960,-84.18350,Z3,0
S012,La Uruca,9.94860,-84.11050,Z1,0
S013,UCR San Pedro,9.93800,-84.05030,Z1,1
S014,Curridabat Centro,9.91340,-84.05020,Z2,0
S015,Pinares,9.90170,-84.04220,Z2,0
S016,Desamparados Centro,9.89870,-84.06370,Z2,1
S017,San Miguel de Desamparados,9.87620,-84.05710,Z3,0
S018,San Diego de La Unión,9.90030,-83.98570,Z3,0
S019,Taras,9.87930,-83.94930,Z3,0
S020,Cartago Centro,9.86410,-83.91960,Z4,1
S021,Alajuela Centro,10.01620,-84.21430,Z3,1
S022,La Uruca - Intersección,9.94710,-84.11380,Z1,0
S023,Barrio México,9.93980,-84.09150,Z1,0
Loading
Loading