-
Notifications
You must be signed in to change notification settings - Fork 951
Fix consume() breaking out of chunked loop on first result #2235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1119,7 +1119,8 @@ Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) { | |
| PyObject *msglist; | ||
| rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu; | ||
| CallState cs; | ||
| Py_ssize_t i, n = 0; | ||
| Py_ssize_t i, msgs_received_count = 0; | ||
| Py_ssize_t chunk_msg_count; | ||
| const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */ | ||
| int total_timeout_ms; | ||
| int chunk_timeout_ms; | ||
|
|
@@ -1156,16 +1157,15 @@ Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) { | |
| * ThreadPool. Only use wakeable poll for | ||
| * blocking calls that need to be interruptible. */ | ||
| if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) { | ||
| n = (Py_ssize_t)rd_kafka_consume_batch_queue( | ||
| msgs_received_count = (Py_ssize_t)rd_kafka_consume_batch_queue( | ||
| rkqu, total_timeout_ms, rkmessages, num_messages); | ||
|
|
||
| if (n < 0) { | ||
| /* Error - need to restore GIL before setting error */ | ||
| PyEval_RestoreThread(cs.thread_state); | ||
| if (msgs_received_count < 0) { | ||
| if (CallState_end(self, &cs)) | ||
| cfl_PyErr_Format( | ||
| rd_kafka_last_error(), "%s", | ||
| rd_kafka_err2str(rd_kafka_last_error())); | ||
| free(rkmessages); | ||
| cfl_PyErr_Format( | ||
| rd_kafka_last_error(), "%s", | ||
| rd_kafka_err2str(rd_kafka_last_error())); | ||
| return NULL; | ||
| } | ||
| } else { | ||
|
|
@@ -1178,30 +1178,40 @@ Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) { | |
| break; | ||
| } | ||
|
|
||
| /* Consume with chunk timeout */ | ||
| n = (Py_ssize_t)rd_kafka_consume_batch_queue( | ||
| rkqu, chunk_timeout_ms, rkmessages, num_messages); | ||
|
|
||
| if (n < 0) { | ||
| /* Error - need to restore GIL before setting | ||
| * error */ | ||
| PyEval_RestoreThread(cs.thread_state); | ||
| /* Consume with chunk timeout, appending after | ||
| * already-accumulated messages */ | ||
| chunk_msg_count = | ||
| (Py_ssize_t)rd_kafka_consume_batch_queue( | ||
| rkqu, chunk_timeout_ms, | ||
| rkmessages + msgs_received_count, | ||
| num_messages - | ||
| (unsigned int)msgs_received_count); | ||
|
|
||
| if (chunk_msg_count < 0) { | ||
| for (i = 0; i < msgs_received_count; i++) | ||
| rd_kafka_message_destroy(rkmessages[i]); | ||
| if (CallState_end(self, &cs)) | ||
| cfl_PyErr_Format( | ||
| rd_kafka_last_error(), "%s", | ||
| rd_kafka_err2str( | ||
| rd_kafka_last_error())); | ||
| free(rkmessages); | ||
| cfl_PyErr_Format( | ||
| rd_kafka_last_error(), "%s", | ||
| rd_kafka_err2str(rd_kafka_last_error())); | ||
| return NULL; | ||
| } | ||
|
|
||
| /* If we got messages, exit the loop */ | ||
| if (n > 0) { | ||
| msgs_received_count += chunk_msg_count; | ||
|
|
||
| /* If we got all requested messages, exit the loop */ | ||
| if (msgs_received_count >= (Py_ssize_t)num_messages) { | ||
| break; | ||
| } | ||
|
Comment on lines
1181
to
1207
|
||
|
|
||
| chunk_count++; | ||
|
|
||
| /* Check for signals between chunks */ | ||
| if (check_signals_between_chunks(self, &cs)) { | ||
| for (i = 0; i < msgs_received_count; i++) | ||
| rd_kafka_message_destroy(rkmessages[i]); | ||
| free(rkmessages); | ||
| return NULL; | ||
| } | ||
|
|
@@ -1210,17 +1220,17 @@ Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) { | |
|
|
||
| /* Final GIL restore and signal check */ | ||
| if (!CallState_end(self, &cs)) { | ||
| for (i = 0; i < n; i++) { | ||
| for (i = 0; i < msgs_received_count; i++) { | ||
| rd_kafka_message_destroy(rkmessages[i]); | ||
| } | ||
| free(rkmessages); | ||
| return NULL; | ||
| } | ||
|
|
||
| /* Create Python list from messages */ | ||
| msglist = PyList_New(n); | ||
| msglist = PyList_New(msgs_received_count); | ||
|
|
||
| for (i = 0; i < n; i++) { | ||
| for (i = 0; i < msgs_received_count; i++) { | ||
| PyObject *msgobj = Message_new0(self, rkmessages[i]); | ||
| #ifdef RD_KAFKA_V_HEADERS | ||
| /** Have to detach headers outside Message_new0 because it | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the chunked-loop error path (chunk_msg_count < 0), the code destroys accumulated messages and restores the GIL but returns without calling CallState_end() (or otherwise clearing the thread-local CallState). This can leave per-thread state behind. Ensure the CallState is properly ended/cleaned up before returning from this error path.