Skip to content

feat: Migrate task states and task messages from MongoDB to PostgreSQL#128

Draft
smoreinis wants to merge 10 commits intomainfrom
stas/task-state-postgres
Draft

feat: Migrate task states and task messages from MongoDB to PostgreSQL#128
smoreinis wants to merge 10 commits intomainfrom
stas/task-state-postgres

Conversation

@smoreinis
Copy link
Collaborator

@smoreinis smoreinis commented Jan 13, 2026

Summary

Implements a phased migration strategy for task states and task messages from MongoDB to PostgreSQL with zero-downtime rollout capability. Both use the same dual-write/dual-read pattern controlled by independent feature flags.

Key changes:

  • Add TASK_STATE_STORAGE_PHASE and TASK_MESSAGE_STORAGE_PHASE feature flags
  • Create task_states and task_messages PostgreSQL tables with JSONB columns
  • Implement dual repository pattern for safe migration rollout of both entities
  • Add Datadog StatsD metrics for monitoring data consistency
  • Add benchmark suite and storage_backend query parameter for testing
  • Auto-disable MongoDB when both storage phases are set to postgres

Migration Phases

Each entity migrates independently via its own env var:

Phase Behavior
mongodb Legacy behavior - MongoDB only
dual_write Write to both, read from MongoDB
dual_read Write to both, read from both + verify consistency
postgres PostgreSQL only (target state)

MongoDB Auto-Disable

When both TASK_STATE_STORAGE_PHASE=postgres and TASK_MESSAGE_STORAGE_PHASE=postgres, MongoDB infrastructure is fully skipped:

  • No MongoClient created or pinged at startup
  • No MongoDB indexes created
  • /readyz health check omits MongoDB (returns only postgres + redis)
  • MongoDB repository DI factories return None (dual repos handle this safely)
  • Fail-fast RuntimeError if a phase requires MongoDB but it's not initialized
  • Dead DTaskStateRepository dependency removed from AgentTaskService

No new env var needed — the flag is derived from existing storage phase settings.

Files Changed

Task State Migration

  • src/adapters/orm.pyTaskStateORM model (JSONB state column)
  • src/domain/repositories/task_state_postgres_repository.py — PostgreSQL repository
  • src/domain/repositories/task_state_dual_repository.py — Dual-write wrapper
  • src/domain/use_cases/states_use_case.py — Updated to use dual repository
  • database/migrations/.../postgres_task_state_07fc12196914.py — Alembic migration
  • scripts/backfill_task_states.py — Backfill MongoDB → PostgreSQL
  • scripts/verify_task_states.py — Verify data consistency
  • tests/unit/repositories/test_task_state_dual_repository.py — 35 tests
  • tests/unit/repositories/test_task_state_postgres_repository.py — 2 tests

Task Message Migration

  • src/adapters/orm.pyTaskMessageORM model (JSONB content column, 3 indexes)
  • src/domain/repositories/task_message_postgres_repository.py — PostgreSQL repository with JSONB filter translation and cursor pagination
  • src/domain/repositories/task_message_dual_repository.py — Dual-write wrapper with MongoDB filter conversion
  • src/domain/services/task_message_service.py — Updated to use dual repository
  • src/domain/use_cases/messages_use_case.py — Pass raw filters to storage layer
  • database/migrations/.../postgres_messages_b4d5f54e4ba2.py — Alembic migration
  • scripts/backfill_task_messages.py — Backfill MongoDB → PostgreSQL
  • scripts/verify_task_messages.py — Verify data consistency
  • tests/unit/repositories/test_task_message_dual_repository.py — 47 tests
  • tests/unit/repositories/test_task_message_postgres_repository.py — 16 tests

MongoDB Auto-Disable

  • src/config/environment_variables.pymongodb_required property, fix default inconsistency
  • src/config/dependencies.py — Guard MongoDB init, add startup status logging
  • src/domain/repositories/task_message_repository.py — DI factory returns None when disabled
  • src/domain/repositories/task_state_repository.py — DI factory returns None when disabled
  • src/domain/repositories/task_message_dual_repository.py — Fail-fast validation
  • src/domain/repositories/task_state_dual_repository.py — Fail-fast validation
  • src/domain/services/task_service.py — Remove dead DTaskStateRepository dependency
  • src/api/health_interceptor.py — Skip MongoDB health check when disabled

Shared / Config

  • src/config/environment_variables.py — Both storage phase env vars
  • src/adapters/orm.py — Both ORM models

Metrics (dual_read phase)

Task States

Metric Description
task_state.dual_read.match Data matches between MongoDB and PostgreSQL
task_state.dual_read.mismatch.missing_postgres Missing in PostgreSQL
task_state.dual_read.mismatch.missing_mongodb Missing in MongoDB
task_state.dual_read.mismatch.state_content State content differs
task_state.dual_read.list_count_mismatch List counts differ

Task Messages

Metric Description
task_message.dual_read.match Data matches between MongoDB and PostgreSQL
task_message.dual_read.mismatch.missing_postgres Missing in PostgreSQL
task_message.dual_read.mismatch.missing_mongodb Missing in MongoDB
task_message.dual_read.mismatch.content Message content differs
task_message.dual_read.list_count_mismatch List counts differ

Rollout Plan

For each entity (task states, then task messages):

  1. Deploy with *_STORAGE_PHASE=mongodb (no behavior change)
  2. Run backfill: python scripts/backfill_task_{states,messages}.py
  3. Enable dual-write: Set *_STORAGE_PHASE=dual_write
  4. Enable dual-read: Set *_STORAGE_PHASE=dual_read, monitor metrics
  5. Switch to PostgreSQL: Set *_STORAGE_PHASE=postgres
  6. MongoDB auto-disables — once both phases are postgres, no MongoDB connection is made

Rollback

Set either *_STORAGE_PHASE env var back to the previous phase at any time. MongoDB will automatically re-initialize on next startup if needed.

Test plan

  • Unit tests pass (288 total: 35 + 2 task state, 47 + 16 task message, plus existing)
  • Integration tests with real databases
  • Manual verification of each phase transition
  • Monitor metrics during dual_read phase before final switch
  • Verify MongoDB-disabled startup (both phases = postgres): logs show "MongoDB: DISABLED", /readyz omits MongoDB

Implement phased migration strategy for task states:
- Phase 0: Add feature flag TASK_STATE_STORAGE_PHASE
- Add TaskStateORM model and Alembic migration
- Create TaskStatePostgresRepository for PostgreSQL storage
- Create TaskStateDualRepository for phased rollout

Migration phases supported:
- mongodb: Legacy behavior (MongoDB only)
- dual_write: Write to both, read from MongoDB
- dual_read: Write to both, read from both with verification
- postgres: PostgreSQL only (target state)

Includes:
- Datadog StatsD metrics for dual_read verification
- Backfill script for existing MongoDB data
- Verification script for data consistency checks
- Unit tests for all repository operations and metrics
@smoreinis smoreinis force-pushed the stas/task-state-postgres branch from 7685cef to a9f298c Compare January 13, 2026 00:25
- Add benchmark scripts for comparing MongoDB vs PostgreSQL performance:
  - benchmark_task_state.py: Repository-level benchmarks
  - benchmark_api.py: API-level benchmarks with connection pooling
  - compare_results.py: Generate markdown comparison reports
  - locustfile.py: Cluster load tests with Locust

- Add storage_backend query parameter to dynamically switch backends:
  - Enables benchmarking without server restarts
  - Valid values: mongodb, dual_write, dual_read, postgres

- Fix FastAPI dependency injection in TaskStateDualRepository:
  - Remove 'from __future__ import annotations' which broke DI resolution
  - Use List from typing to avoid 'list' method name shadowing
  - Update authorization_shortcuts to use DTaskStateDualRepository

Benchmark results show MongoDB ~20-30% faster than PostgreSQL at API level.
Port task messages to PostgreSQL using the same phased dual-write/dual-read
strategy used for task states. Controlled by TASK_MESSAGE_STORAGE_PHASE env var
with phases: mongodb → dual_write → dual_read → postgres.

New files:
- TaskMessageORM model with JSONB content column and indexes
- Alembic migration for task_messages table
- TaskMessagePostgresRepository with JSONB filter translation and cursor pagination
- TaskMessageDualRepository with 4-phase switching and Datadog metrics
- Backfill and verification scripts for data migration
- Unit tests (63 tests: 47 dual repo + 16 postgres repo)

Modified:
- Wire TaskMessageService to use DualRepository
- Pass raw TaskMessageEntityFilter objects through to storage layer
- Add TASK_MESSAGE_STORAGE_PHASE to environment variables
JSONB supports indexing and faster querying compared to JSON,
consistent with all other JSON columns in the schema.
@smoreinis smoreinis changed the title feat: Add task state migration from MongoDB to PostgreSQL feat: Migrate task states and task messages from MongoDB to PostgreSQL Feb 10, 2026
@smoreinis
Copy link
Collaborator Author

cc @viral1701 for viz

Derive a `mongodb_required` property from TASK_STATE_STORAGE_PHASE and
TASK_MESSAGE_STORAGE_PHASE. When both are "postgres", skip MongoDB client
creation, index setup, and health checks — eliminating a hard dependency
on MongoDB for fully-migrated deployments.

- Add EnvironmentVariables.mongodb_required computed property
- Guard MongoDB init in GlobalDependencies.load() with startup logging
- Return None from MongoDB repository DI factories when db is None
- Add fail-fast RuntimeError in dual repos on phase/db mismatch
- Remove dead DTaskStateRepository dependency from AgentTaskService
- Skip MongoDB in /readyz health check when client is None
- Fix model default inconsistency (mongodb → postgres) to match refresh()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant