Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,15 @@ protected CompletableFuture<Message<T>> nextPendingReceive() {

protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
getInternalExecutor(message).execute(() -> {
if (!receivedFuture.complete(message)) {
log.warn().attr("cancelled", receivedFuture.isCancelled())
.attr("message", message)
.log("Race condition detected, receive future was already completed and message was dropped");
if (!receivedFuture.complete(message) && getState() != State.Closing && getState() != State.Closed) {
log.error().attr("cancelled", receivedFuture.isCancelled())
.attr("message", message)
.log("Race condition detected, receive future was already completed and message was dropped."
+ " In other words, the message was dropped internally, the client-side will encounter a"
+ " crucial issue: this message will never be consumed until the consumer is restarted or"
+ " the topic is unloaded. Under normal circumstances, this won't happen. It only occurs when"
+ " user itself has completed the completable future object returned by"
+ " \"consumer.receiveAsync()\"");
}
});
}
Expand Down Expand Up @@ -1118,9 +1123,13 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messag
protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
if (!future.complete(messages)) {
log.warn().attr("cancelled", future.isCancelled())
.attr("messages", messages)
.log("Race condition detected, batch receive future was"
+ " already completed and messages were dropped");
.attr("messages", messages)
.log("Race condition detected, receive future was already completed and message was dropped."
+ " In other words, the message was dropped internally, the client-side will encounter a"
+ " crucial issue: these message will never be consumed until the consumer is restarted or"
+ " the topic is unloaded. Under normal circumstances, this won't happen. It only occurs when"
+ " user itself has completed the completable future object returned by"
+ " \"consumer.batchReceiveAsync()\"");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,12 +1720,26 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
*/
void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
if (pendingReceives.isEmpty()) {
if (getState() != State.Closing && getState() != State.Closed) {
log.error().attr("message", message)
.attr("pendingReceives-size", pendingReceives.size())
.log("If you received this log, it means that you encountered a bug: a message was"
+ " dropped internally, the client-side will encounter a crucial issue: this message will"
+ " never be consumed until the consumer is restarted or the topic is unloaded.");
}
return;
}

// fetch receivedCallback from queue
final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture == null) {
if (getState() != State.Closing && getState() != State.Closed) {
log.error().attr("message", message)
.log("The pendingReceives pulled out a null conpletableFuture object. If you received this log,"
+ " it means that you encountered a bug: a message was"
+ " dropped internally, the client-side will encounter a crucial issue: this message will never"
+ " be consumed until the consumer is restarted or the topic is unloaded.");
}
return;
}

Expand Down
Loading