Feat/celery integration#527
Conversation
9f4fb89 to
dd71537
Compare
|
follow up from #464 |
can you also add a changeset entry, please |
|
This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the |
|
on it |
Co-authored-by: Dustin Byrne <dustinsbyrne@gmail.com>
- make fork safety complete in the client - add shutdown mechanism to the integration - better test coverage - better docs on usage
dd71537 to
062df48
Compare
|
@marandaneto - ruff formatting has been applied, and changeset entry existed already @dustinbyrne - added comments to integration and example explicitly asking to reinitialize PostHog client if using a custom flag definition cache provider and reinstrument the integration with that client instance |
|
beacfbf to
1ad94df
Compare
|
@parinporecha CI is still unhappy
|
Summary
This PR:
PosthogCeleryIntegrationto automatically capture Celery task lifecycle events and exceptions.distinct_id,session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.Clientsafer across process forks by reinitializing fork-unsafe client state in child processes.Context
I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.
That leaves a few gaps:
This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.
The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.
While testing this, I found a separate SDK issue: when a
Clientconfigured in async mode is inherited across a process fork, the child process inherits a client whose consumer threads no longer exist. In practice, that means worker-side events don't get delivered. This would also be a problem when using the SDK in some Django deployments.So this PR also adds fork handling to
Clientby reinitializing its queue, consumers, and other state in the child process viaos.register_at_fork.Changes
New: Celery Integration (
posthog/integrations/celery.py)task_prerun,task_success,task_failure, etc.) to capture events likecelery task started,celery task successetc. Check the docstring in the integration module code for complete list of supported events._on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers._on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.Refactored: Client Fork Safety
posthog/client.py_reinit_after_forkmethod to reset the internal queue and spin up new consumers in a child process.os.register_at_fork(on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.Examples
examples/celery_integration.pyTests
posthog/test/integrations/test_celery_integration.pycovering:posthog/test/test_client_fork.pycovering:Screenshots (created through example script)
Celery task lifecycle events and captured Exception -

Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -

Captured exception -
