Skip to content

worker_plan_database: dispose DB pool after fork (cross-user leak)#674

Merged
neoneye merged 1 commit intomainfrom
fix/dispose-db-pool-after-fork
May 6, 2026
Merged

worker_plan_database: dispose DB pool after fork (cross-user leak)#674
neoneye merged 1 commit intomainfrom
fix/dispose-db-pool-after-fork

Conversation

@neoneye
Copy link
Copy Markdown
Member

@neoneye neoneye commented May 6, 2026

Summary

luigi.build(workers=N) forks subprocesses that inherit the parent's open PostgreSQL connections. Two sibling workers then write to the same socket, the wire protocol interleaves, and psycopg2 starts reading garbage. Recent production logs:

psycopg2.OperationalError: lost synchronization with server: got message type "p", length 1919903337
psycopg2.OperationalError: insufficient data in "D" message
psycopg2.DatabaseError:    PGRES_TUPLES_OK and no message from libpq

Beyond stability, this race is a privacy hazard — one child's row data can land in another child's read buffer, leaking one user's plan data into another user's query result.

Fix

Register an os.register_at_fork(after_in_child=...) hook in worker_plan_database/app.py that calls db.engine.dispose(close=False).

  • register_at_fork fires inside every Luigi worker child immediately after fork (Luigi uses multiprocessing.Process, which uses fork() on Linux).
  • close=False is critical: a normal close would send TCP close packets down the parent's still-live socket and corrupt the parent's protocol stream. With close=False, the inherited pool is dropped without any wire activity. The child opens a fresh connection lazily on first use.

Side effect

Eliminates the recurring symptom where a task got stuck in processing because the worker died on a corrupted connection mid-pipeline (the issue you saw with task 66b2740b-…).

Test plan

  • Generate a plan; confirm pipeline runs end-to-end without lost synchronization with server / PGRES_TUPLES_OK / insufficient data in "D" message warnings in worker_plan_database logs.
  • Generate two plans concurrently (different users); confirm both finish without DB corruption and that each plan's report only reflects its own prompt.
  • Restart the worker mid-run; confirm the running task does not stay stuck in processing due to a poisoned connection.

🤖 Generated with Claude Code

`luigi.build(workers=N)` forks subprocesses that inherit the
parent's open PostgreSQL connections. Two sibling workers then
write to the same socket, the wire protocol interleaves, and
psycopg2 starts reading garbage. Recent production logs:

    psycopg2.OperationalError: lost synchronization with server:
                               got message type "p", length 1919903337
    psycopg2.OperationalError: insufficient data in "D" message
    psycopg2.DatabaseError:    PGRES_TUPLES_OK and no message from libpq

Beyond stability, this race is a privacy hazard: one child's
row data can land in another child's read buffer, leaking one
user's plan data into another user's query result.

Register an `os.register_at_fork(after_in_child=...)` hook that
calls `db.engine.dispose(close=False)`. The `close=False` flag
discards the inherited pool *without* sending TCP close packets
— a normal close would travel down the parent's still-live
socket and corrupt its protocol stream. Each child then opens a
fresh connection lazily.

Side benefit: removes the recurring symptom that left tasks
stuck in `processing` when the worker died on a corrupted
connection.
@neoneye neoneye merged commit 24af39c into main May 6, 2026
3 checks passed
@neoneye neoneye deleted the fix/dispose-db-pool-after-fork branch May 6, 2026 12:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant