Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ global_job_config:
value: v2.14.0
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support
- name: LIBRDKAFKA_BRANCH
value: dev_kip-932_queues-for-kafka
value: dev_kip-932_queues-for-kafka_additional_api_python
prologue:
commands:
- checkout
Expand Down
73 changes: 40 additions & 33 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,11 @@ Admin_config_dict_to_c(void *c_obj, PyObject *dict, const char *op_name) {
static PyObject *
Admin_create_topics(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *topics = NULL, *future, *validate_only_obj = NULL;
static char *kws[] = {"topics", "future",
/* options */
"validate_only", "request_timeout",
"operation_timeout", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
static char *kws[] = {"topics", "future",
/* options */
"validate_only", "request_timeout",
"operation_timeout", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
int tcnt;
int i;
Expand Down Expand Up @@ -702,10 +702,10 @@ Admin_create_topics(Handle *self, PyObject *args, PyObject *kwargs) {
*/
static PyObject *
Admin_delete_topics(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *topics = NULL, *future;
static char *kws[] = {"topics", "future",
/* options */
"request_timeout", "operation_timeout", NULL};
PyObject *topics = NULL, *future;
static char *kws[] = {"topics", "future",
/* options */
"request_timeout", "operation_timeout", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
int tcnt;
Expand Down Expand Up @@ -807,11 +807,11 @@ Admin_delete_topics(Handle *self, PyObject *args, PyObject *kwargs) {
static PyObject *
Admin_create_partitions(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *topics = NULL, *future, *validate_only_obj = NULL;
static char *kws[] = {"topics", "future",
/* options */
"validate_only", "request_timeout",
"operation_timeout", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
static char *kws[] = {"topics", "future",
/* options */
"validate_only", "request_timeout",
"operation_timeout", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
int tcnt;
int i;
Expand Down Expand Up @@ -934,10 +934,10 @@ Admin_create_partitions(Handle *self, PyObject *args, PyObject *kwargs) {
static PyObject *
Admin_describe_configs(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *resources, *future;
static char *kws[] = {"resources", "future",
/* options */
"request_timeout", "broker", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
static char *kws[] = {"resources", "future",
/* options */
"request_timeout", "broker", NULL};
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
PyObject *ConfigResource_type;
int cnt, i;
Expand Down Expand Up @@ -1064,11 +1064,11 @@ static PyObject *Admin_incremental_alter_configs(Handle *self,
PyObject *args,
PyObject *kwargs) {
PyObject *resources, *future;
PyObject *validate_only_obj = NULL;
static char *kws[] = {"resources", "future",
/* options */
"validate_only", "request_timeout", "broker",
NULL};
PyObject *validate_only_obj = NULL;
static char *kws[] = {"resources", "future",
/* options */
"validate_only", "request_timeout", "broker",
NULL};
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
PyObject *ConfigResource_type, *ConfigEntry_type;
Expand Down Expand Up @@ -1232,11 +1232,11 @@ static PyObject *Admin_incremental_alter_configs(Handle *self,
static PyObject *
Admin_alter_configs(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *resources, *future;
PyObject *validate_only_obj = NULL;
static char *kws[] = {"resources", "future",
/* options */
"validate_only", "request_timeout", "broker",
NULL};
PyObject *validate_only_obj = NULL;
static char *kws[] = {"resources", "future",
/* options */
"validate_only", "request_timeout", "broker",
NULL};
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
PyObject *ConfigResource_type;
Expand Down Expand Up @@ -3696,8 +3696,8 @@ Admin_c_ConfigEntries_to_py(PyObject *ConfigEntry_type,
rd_kafka_ConfigEntry_is_synonym(ent));

c_synonyms = rd_kafka_ConfigEntry_synonyms(ent, &synonym_cnt);
synonyms = Admin_c_ConfigEntries_to_py(ConfigEntry_type,
c_synonyms, synonym_cnt);
synonyms = Admin_c_ConfigEntries_to_py(ConfigEntry_type,
c_synonyms, synonym_cnt);
if (!synonyms) {
Py_DECREF(kwargs);
Py_DECREF(dict);
Expand Down Expand Up @@ -5537,9 +5537,16 @@ static int Admin_init(PyObject *selfobj, PyObject *args, PyObject *kwargs) {
rd_kafka_set_log_queue(self->rk, NULL);


/* Wait for the background thread to set the token */
if (self->oauth_cb) {
return wait_for_oauth_token_set(self);
/* Wait for the background thread to set the token. Caller owns
* destroy on failure — wait_for_oauth_token_set no longer touches
* self->rk (see refactor note in confluent_kafka.c). */
if (self->oauth_cb && wait_for_oauth_token_set(self) == -1) {
CallState cs;
CallState_begin(self, &cs);
rd_kafka_destroy(self->rk);
CallState_end(self, &cs);
self->rk = NULL;
return -1;
}

return 0;
Expand Down
13 changes: 10 additions & 3 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1825,9 +1825,16 @@ static int Consumer_init(PyObject *selfobj, PyObject *args, PyObject *kwargs) {
assert(self->u.Consumer.rkqu);


/* Wait for the background thread to set the token */
if (self->oauth_cb) {
return wait_for_oauth_token_set(self);
/* Wait for the background thread to set the token. Caller owns
* destroy on failure — wait_for_oauth_token_set no longer touches
* self->rk (see refactor note in confluent_kafka.c). */
if (self->oauth_cb && wait_for_oauth_token_set(self) == -1) {
CallState cs;
CallState_begin(self, &cs);
rd_kafka_destroy(self->rk);
CallState_end(self, &cs);
self->rk = NULL;
return -1;
}

return 0;
Expand Down
13 changes: 10 additions & 3 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1468,9 +1468,16 @@ static int Producer_init(PyObject *selfobj, PyObject *args, PyObject *kwargs) {
if (self->logger)
rd_kafka_set_log_queue(self->rk, NULL);

/* Wait for the background thread to set the token */
if (self->oauth_cb) {
return wait_for_oauth_token_set(self);
/* Wait for the background thread to set the token. Caller owns
* destroy on failure — wait_for_oauth_token_set no longer touches
* self->rk (see refactor note in confluent_kafka.c). */
if (self->oauth_cb && wait_for_oauth_token_set(self) == -1) {
CallState cs;
CallState_begin(self, &cs);
rd_kafka_destroy(self->rk);
CallState_end(self, &cs);
self->rk = NULL;
return -1;
}

return 0;
Expand Down
Loading