[KIP-932] Support callbacks for share consumer python binding#2252
Open
Kaushik Raina (k-raina) wants to merge 7 commits into
Open
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
error_cb, throttle_cb, log_cb, and oauth_cbon ShareConsumer, which dispatch through the share-consumer drain paths i.epoll, commit_sync, commit_async.args[0] + kwargsbeforecommon_conf_setup.rk_repviard_kafka_share_set_log_queue(rkshare, NULL)so log records reach the Python logger through the same drain as poll/commit.rd_kafka_share_sasl_background_callbacks_enable+wait_for_oauth_token_setso construction returns a ready-to-use client.wait_for_oauth_token_setrefactored to no longer own destroy on failure, and each _init now invokes the correct teardown API i.erd_kafka_destroy vs rd_kafka_share_destroyrd_kafka_yield/rd_kafka_oauthbearer_set_tokencalls instead ofh->rk. Alsoerror_cbadditionally tolerates dispatch with no CallState and saves/restores any pending Python exception across the user callback so the constructor's original error isn't silently swallowed.tests/test_ShareConsumer_callbacks.pycovers dispatch, exception propagation, config rejection, and the CallState wrap on commit paths against an unreachable broker. Integration: throttle_cb undertests/integration/share_consumer/and full OAUTHBEARER handshake/refresh/auth-failure undertests/integration/share_consumer_oauth/