worker_plan_database: dispose DB pool after fork (cross-user leak)#674
Merged
worker_plan_database: dispose DB pool after fork (cross-user leak)#674
Conversation
`luigi.build(workers=N)` forks subprocesses that inherit the
parent's open PostgreSQL connections. Two sibling workers then
write to the same socket, the wire protocol interleaves, and
psycopg2 starts reading garbage. Recent production logs:
psycopg2.OperationalError: lost synchronization with server:
got message type "p", length 1919903337
psycopg2.OperationalError: insufficient data in "D" message
psycopg2.DatabaseError: PGRES_TUPLES_OK and no message from libpq
Beyond stability, this race is a privacy hazard: one child's
row data can land in another child's read buffer, leaking one
user's plan data into another user's query result.
Register an `os.register_at_fork(after_in_child=...)` hook that
calls `db.engine.dispose(close=False)`. The `close=False` flag
discards the inherited pool *without* sending TCP close packets
— a normal close would travel down the parent's still-live
socket and corrupt its protocol stream. Each child then opens a
fresh connection lazily.
Side benefit: removes the recurring symptom that left tasks
stuck in `processing` when the worker died on a corrupted
connection.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
luigi.build(workers=N)forks subprocesses that inherit the parent's open PostgreSQL connections. Two sibling workers then write to the same socket, the wire protocol interleaves, and psycopg2 starts reading garbage. Recent production logs:Beyond stability, this race is a privacy hazard — one child's row data can land in another child's read buffer, leaking one user's plan data into another user's query result.
Fix
Register an
os.register_at_fork(after_in_child=...)hook inworker_plan_database/app.pythat callsdb.engine.dispose(close=False).register_at_forkfires inside every Luigi worker child immediately after fork (Luigi usesmultiprocessing.Process, which usesfork()on Linux).close=Falseis critical: a normal close would send TCP close packets down the parent's still-live socket and corrupt the parent's protocol stream. Withclose=False, the inherited pool is dropped without any wire activity. The child opens a fresh connection lazily on first use.Side effect
Eliminates the recurring symptom where a task got stuck in
processingbecause the worker died on a corrupted connection mid-pipeline (the issue you saw with task66b2740b-…).Test plan
lost synchronization with server/PGRES_TUPLES_OK/insufficient data in "D" messagewarnings inworker_plan_databaselogs.processingdue to a poisoned connection.🤖 Generated with Claude Code