Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -3511,6 +3512,137 @@ public void testConsumerSubscriptionInitialize() throws Exception {
log.info().attr("method", methodName).log("Exiting test");
}

@Test(timeOut = 100000)
public void testConsumerImplHasMessageAvailableDoesNotThrowBeforeReceive() throws Exception {
log.info().attr("method", methodName).log("Starting test");
String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/has-message-available");

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();
producer.send("existing-message".getBytes(UTF_8));

// hasMessageAvailable is exposed by ConsumerImpl, not the Consumer public API.
@Cleanup
ConsumerImpl<byte[]> latestConsumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("test-has-message-available-latest")
.receiverQueueSize(0)
.subscribe();
assertThat(latestConsumer.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(false);
assertThatCode(latestConsumer::hasMessageAvailable)
.doesNotThrowAnyException();
assertFalse(latestConsumer.hasMessageAvailable());

@Cleanup
ConsumerImpl<byte[]> earliestConsumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("test-has-message-available-earliest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(0)
.subscribe();
assertThat(earliestConsumer.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(true);
assertThatCode(earliestConsumer::hasMessageAvailable)
.doesNotThrowAnyException();
assertTrue(earliestConsumer.hasMessageAvailable());

log.info().attr("method", methodName).log("Exiting test");
}

@Test(timeOut = 100000)
public void testReaderHasMessageAvailableDoesNotThrowBeforeRead() throws Exception {
log.info().attr("method", methodName).log("Starting test");
String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/reader-has-message-available");

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();
producer.send("existing-message".getBytes(UTF_8));

@Cleanup
Reader<byte[]> latestReader = pulsarClient.newReader()
.topic(topicName)
.startMessageId(MessageId.latest)
.receiverQueueSize(0)
.create();
assertThat(latestReader.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(false);
assertThatCode(latestReader::hasMessageAvailable)
.doesNotThrowAnyException();
assertFalse(latestReader.hasMessageAvailable());

@Cleanup
Reader<byte[]> earliestReader = pulsarClient.newReader()
.topic(topicName)
.startMessageId(MessageId.earliest)
.receiverQueueSize(0)
.create();
assertThat(earliestReader.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(true);
assertThatCode(earliestReader::hasMessageAvailable)
.doesNotThrowAnyException();
assertTrue(earliestReader.hasMessageAvailable());

log.info().attr("method", methodName).log("Exiting test");
}

@Test(timeOut = 100000)
public void testHasMessageAvailableDoesNotThrowOnEmptyTopic() throws Exception {
log.info().attr("method", methodName).log("Starting test");
String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/empty-has-message-available");

@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("test-empty-has-message-available")
.receiverQueueSize(0)
.subscribe();
assertThat(consumer.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(false);
assertThatCode(consumer::hasMessageAvailable)
.doesNotThrowAnyException();
assertFalse(consumer.hasMessageAvailable());

@Cleanup
Reader<byte[]> latestReader = pulsarClient.newReader()
.topic(topicName)
.startMessageId(MessageId.latest)
.receiverQueueSize(0)
.create();
assertThat(latestReader.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(false);
assertThatCode(latestReader::hasMessageAvailable)
.doesNotThrowAnyException();
assertFalse(latestReader.hasMessageAvailable());

@Cleanup
Reader<byte[]> earliestReader = pulsarClient.newReader()
.topic(topicName)
.startMessageId(MessageId.earliest)
.receiverQueueSize(0)
.create();
assertThat(earliestReader.hasMessageAvailableAsync())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(false);
assertThatCode(earliestReader::hasMessageAvailable)
.doesNotThrowAnyException();
assertFalse(earliestReader.hasMessageAvailable());

log.info().attr("method", methodName).log("Exiting test");
}

@Test(timeOut = 100000)
public void testMultiTopicsConsumerImplPauseForPartitionNumberChange() throws Exception {
log.info().attr("method", methodName).log("Starting test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2720,6 +2720,20 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {

// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
if (startMessageId == null) {
internalGetLastMessageIdAsync().thenAccept(response -> {
lastMessageIdInBroker = response.lastMessageId;
completehasMessageAvailableWithValue(booleanFuture,
hasMoreMessagesThanMarkDeletePosition(response, false, false));
}).exceptionally(e -> {
log.error().exception(e)
.log("Failed getLastMessageId command");
booleanFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return booleanFuture;
}

// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
// if we are starting from latest, we should seek to the actual last message first.
Expand All @@ -2735,34 +2749,13 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
}

future.thenAccept(response -> {
MessageIdAdv lastMessageId = (MessageIdAdv) response.lastMessageId;
MessageIdAdv markDeletePosition = (MessageIdAdv) response.markDeletePosition;

if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0
&& markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) {
// we only care about comparing ledger ids and entry ids as mark delete position doesn't have
// other ids such as batch index
int result = ComparisonChain.start()
.compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId())
.compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())
.result();
if (lastMessageId.getEntryId() < 0) {
completehasMessageAvailableWithValue(booleanFuture, false);
} else if (hasSoughtByTimestamp) {
completehasMessageAvailableWithValue(booleanFuture, result < 0);
} else {
completehasMessageAvailableWithValue(booleanFuture,
resetIncludeHead ? result <= 0 : result < 0);
}
} else if (lastMessageId == null || lastMessageId.getEntryId() < 0) {
completehasMessageAvailableWithValue(booleanFuture, false);
} else {
completehasMessageAvailableWithValue(booleanFuture, resetIncludeHead);
}
completehasMessageAvailableWithValue(booleanFuture,
hasMoreMessagesThanMarkDeletePosition(response,
!hasSoughtByTimestamp && resetIncludeHead, resetIncludeHead));
}).exceptionally(ex -> {
log.error().exception(ex)
.log("Failed getLastMessageId command");
booleanFuture.completeExceptionally(ex.getCause());
booleanFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});

Expand All @@ -2779,8 +2772,9 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
completehasMessageAvailableWithValue(booleanFuture,
hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead));
}).exceptionally(e -> {
log.error("Failed getLastMessageId command");
booleanFuture.completeExceptionally(e.getCause());
log.error().exception(e)
.log("Failed getLastMessageId command");
booleanFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});

Expand All @@ -2796,8 +2790,9 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
completehasMessageAvailableWithValue(booleanFuture,
hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false));
}).exceptionally(e -> {
log.error("Failed getLastMessageId command");
booleanFuture.completeExceptionally(e.getCause());
log.error().exception(e)
.log("Failed getLastMessageId command");
booleanFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
}
Expand All @@ -2821,6 +2816,28 @@ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messa
&& ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1;
}

private boolean hasMoreMessagesThanMarkDeletePosition(GetLastMessageIdResponse response, boolean inclusive,
boolean includeHeadWhenMarkDeleteIsAfterLastMessage) {
MessageIdAdv lastMessageId = (MessageIdAdv) response.lastMessageId;
if (lastMessageId == null || lastMessageId.getEntryId() < 0) {
return false;
}

MessageIdAdv markDeletePosition = (MessageIdAdv) response.markDeletePosition;
if (markDeletePosition == null || (markDeletePosition.getEntryId() < 0
&& markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) {
return includeHeadWhenMarkDeleteIsAfterLastMessage;
}

// We only care about comparing ledger ids and entry ids as mark delete position doesn't have
// other ids such as batch index.
int result = ComparisonChain.start()
.compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId())
.compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())
.result();
return inclusive ? result <= 0 : result < 0;
}

private static final class GetLastMessageIdResponse {
final MessageId lastMessageId;
final MessageId markDeletePosition;
Expand Down