Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,22 @@ private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage
return bkf;
}

/**
* Extract the partition key and the payload size for a non-batch message.
*
* @return a pair of (partitionKey, payloadSize), or null if the message has no partition key.
*/
protected Pair<String, Integer> extractKeyAndSize(RawMessage m, MessageMetadata msgMetadata) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
if (msgMetadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
int payloadSize;
if (msgMetadata.hasUncompressedSize()) {
size = msgMetadata.getUncompressedSize();
payloadSize = msgMetadata.getUncompressedSize();
} else {
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
}
return Pair.of(msgMetadata.getPartitionKey(), size);
return Pair.of(msgMetadata.getPartitionKey(), payloadSize);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.compaction;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,17 +137,12 @@ protected boolean compactBatchMessage(String topic, Map<String, Pair<MessageId,
}

protected MessageCompactionData extractMessageCompactionData(RawMessage m, MessageMetadata metadata) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
if (metadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
if (metadata.hasUncompressedSize()) {
size = metadata.getUncompressedSize();
}
return new MessageCompactionData(m.getMessageId(), metadata.getPartitionKey(),
size, metadata.getEventTime());
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
if (keyAndSize == null) {
return null;
}
return new MessageCompactionData(m.getMessageId(), keyAndSize.getLeft(),
keyAndSize.getRight(), metadata.getEventTime());
}

private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,80 @@ public void testBatchMessageWithNullValue() throws Exception {
assertEquals(messages.get(2).getKey(), "key5");
}

/**
* Write raw non-batch entries directly to the managed ledger, simulating
* messages from C++/Python clients that do not set numMessagesInBatch.
* Verifies that null-value tombstones remove keys during compaction.
*/
@Test
public void testNonBatchedMessageWithNullValue() throws Exception {
String topic = "persistent://my-tenant/my-ns/non-batched-message-with-null-value";

admin.topics().createNonPartitionedTopic(topic);
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.receiverQueueSize(1).readCompacted(true).subscribe().close();

PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();

long seqId = 0;

// key1: value then null-value tombstone
ml.addEntry(buildNonBatchEntry("key1", "my-message-1".getBytes(), seqId++));
ml.addEntry(buildNonBatchEntry("key1", null, seqId++));

// key2: value only (should survive)
ml.addEntry(buildNonBatchEntry("key2", "my-message-3".getBytes(), seqId++));

// key3: value then null-value tombstone
ml.addEntry(buildNonBatchEntry("key3", "my-message-4".getBytes(), seqId++));
ml.addEntry(buildNonBatchEntry("key3", null, seqId++));

// key4: value only (should survive)
ml.addEntry(buildNonBatchEntry("key4", "my-message-6".getBytes(), seqId++));

compact(topic);

List<Message<byte[]>> messages = new ArrayList<>();
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) {
while (true) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
messages.add(message);
}
}

assertEquals(messages.size(), 2);
assertEquals(messages.get(0).getKey(), "key2");
assertEquals(messages.get(1).getKey(), "key4");
}

private byte[] buildNonBatchEntry(String key, byte[] payload, long sequenceId) {
org.apache.pulsar.common.api.proto.MessageMetadata metadata =
new org.apache.pulsar.common.api.proto.MessageMetadata();
metadata.setPartitionKey(key);
metadata.setPublishTime(System.currentTimeMillis());
metadata.setProducerName("test-non-batch");
metadata.setSequenceId(sequenceId);
if (payload == null) {
metadata.setNullValue(true);
}
ByteBuf payloadBuf = io.netty.buffer.Unpooled.wrappedBuffer(
payload != null ? payload : new byte[0]);
ByteBuf entry = org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(
org.apache.pulsar.common.protocol.Commands.ChecksumType.Crc32c,
metadata, payloadBuf);
byte[] bytes = new byte[entry.readableBytes()];
entry.readBytes(bytes);
entry.release();
payloadBuf.release();
return bytes;
}

@Test
public void testWholeBatchCompactedOut() throws Exception {
String topic = "persistent://my-tenant/my-ns/whole-batch-compacted-out";
Expand Down
Loading