Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
*/
bool hasOrderingKey() const;

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return true if the message has a null value (tombstone)
* false if the message has actual payload data
*/
bool hasNullValue() const;

/**
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
* producer
Expand Down
11 changes: 11 additions & 0 deletions include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& disableReplication(bool flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*
* @return the message builder instance
*/
MessageBuilder& setNullValue();

/**
* create a empty message, with no properties or data
*
Expand Down
19 changes: 19 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes
*/
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*/
PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);

/// Accessor for built messages

/**
Expand Down Expand Up @@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return 1 if the message has a null value, 0 otherwise
*/
PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);

#ifdef __cplusplus
}
#endif
4 changes: 4 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t> serializeSingleMessageMetadata
metadata.set_sequence_id(msgMetadata.sequence_id());
}

if (msgMetadata.null_value()) {
metadata.set_null_value(true);
}

size_t size = metadata.ByteSizeLong();
std::unique_ptr<char[]> data{new char[size]};
metadata.SerializeToArray(data.get(), size);
Expand Down
13 changes: 13 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerE
} else {
impl_->metadata.clear_sequence_id();
}

if (singleMetadata.null_value()) {
impl_->metadata.set_null_value(true);
} else {
impl_->metadata.clear_null_value();
}
}

const MessageId& Message::getMessageId() const {
Expand Down Expand Up @@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
return impl_->getOrderingKey();
}

bool Message::hasNullValue() const {
if (impl_) {
return impl_->metadata.null_value();
}
return false;
}

const std::string& Message::getTopicName() const {
if (!impl_) {
return emptyString;
Expand Down
13 changes: 13 additions & 0 deletions lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,35 @@ void MessageBuilder::checkMetadata() {
MessageBuilder& MessageBuilder::setContent(const void* data, size_t size) {
checkMetadata();
impl_->payload = SharedBuffer::copy((char*)data, size);
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setAllocatedContent(void* data, size_t size) {
checkMetadata();
impl_->payload = SharedBuffer::wrap((char*)data, size);
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setContent(const std::string& data) {
checkMetadata();
impl_->payload = SharedBuffer::copy((char*)data.c_str(), data.length());
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setContent(std::string&& data) {
checkMetadata();
impl_->payload = SharedBuffer::take(std::move(data));
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
checkMetadata();
impl_->keyValuePtr = data.impl_;
impl_->metadata.clear_null_value();
return *this;
}

Expand Down Expand Up @@ -157,6 +163,13 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
return *this;
}

MessageBuilder& MessageBuilder::setNullValue() {
checkMetadata();
impl_->metadata.set_null_value(true);
impl_->payload = SharedBuffer();
return *this;
}

const char* MessageBuilder::data() const {
assert(impl_->payload.data());
return impl_->payload.data();
Expand Down
4 changes: 4 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
message->builder.disableReplication(flag);
}

void pulsar_message_set_null_value(pulsar_message_t *message) { message->builder.setNullValue(); }

int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
return message->message.hasProperty(name);
}
Expand Down Expand Up @@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t *message, const char *sc
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}

int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }
30 changes: 30 additions & 0 deletions tests/BatchMessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
}
}

TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
std::vector<Message> msgs;
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());

SharedBuffer payload;
Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
ASSERT_EQ(payload.writableBytes(), 0);

MessageBatch messageBatch;
auto fakeId = MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(msgs.size()));
const std::vector<Message>& messages = messageBatch.messages();

ASSERT_EQ(messages.size(), 3);

ASSERT_TRUE(messages[0].hasNullValue());
ASSERT_EQ(messages[0].getPartitionKey(), "key1");
ASSERT_EQ(messages[0].getLength(), 0);

ASSERT_FALSE(messages[1].hasNullValue());
ASSERT_EQ(messages[1].getPartitionKey(), "key2");
ASSERT_EQ(messages[1].getDataAsString(), "content2");

ASSERT_TRUE(messages[2].hasNullValue());
ASSERT_EQ(messages[2].getPartitionKey(), "key3");
ASSERT_EQ(messages[2].getLength(), 0);
}

TEST(BatchMessageTest, testSendCallback) {
const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";

Expand Down
43 changes: 43 additions & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_TRUE(msg.getTopicName().empty());
}

TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
ASSERT_FALSE(msg.hasNullValue());
}

{
auto msg = MessageBuilder().setNullValue().setPartitionKey("key1").build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
ASSERT_EQ(msg.getPartitionKey(), "key1");
}

{
auto msg = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "key2");
}
}

TEST(MessageTest, testEmptyMessage) {
auto msg = MessageBuilder().build();
ASSERT_FALSE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
}

TEST(MessageTest, testEmptyStringNotNullValue) {
// Empty string message - has content set to ""
auto emptyStringMsg = MessageBuilder().setContent("").build();
ASSERT_FALSE(emptyStringMsg.hasNullValue());
ASSERT_EQ(emptyStringMsg.getLength(), 0);
ASSERT_EQ(emptyStringMsg.getDataAsString(), "");

// Null value message - explicitly marked as null
auto nullValueMsg = MessageBuilder().setNullValue().setPartitionKey("key").build();
ASSERT_TRUE(nullValueMsg.hasNullValue());
ASSERT_EQ(nullValueMsg.getLength(), 0);

// Both have length 0, but they are semantically different
// Empty string: the value IS an empty string
// Null value: the value does not exist (tombstone for compaction)
}
115 changes: 115 additions & 0 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1045,5 +1045,120 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
client.close();
}

TEST(ReaderTest, testReadCompactedWithNullValue) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testReadCompactedWithNullValue-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send messages with keys
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));

// Send a tombstone (null value) for key2
auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getLength(), 0);
ASSERT_EQ(ResultOk, producer.send(tombstone));

// Update key1 with a new value
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));

// Create a reader with readCompacted enabled to read all messages (before compaction runs)
ReaderConfiguration readerConf;
readerConf.setReadCompacted(true);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

// Read all messages and verify we can detect null values
std::map<std::string, std::string> keyValues;
std::set<std::string> nullValueKeys;
int messageCount = 0;

bool hasMessageAvailable = false;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
while (hasMessageAvailable) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
messageCount++;

std::string key = msg.getPartitionKey();
if (msg.hasNullValue()) {
nullValueKeys.insert(key);
LOG_INFO("Received null value (tombstone) for key: " << key);
} else {
keyValues[key] = msg.getDataAsString();
LOG_INFO("Received message for key: " << key << ", value: " << msg.getDataAsString());
}

ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
}

// Verify we read all 5 messages
ASSERT_EQ(messageCount, 5) << "Expected to read 5 messages";

// Verify the null value message was received and detected
ASSERT_EQ(nullValueKeys.size(), 1) << "Expected exactly one null value message";
ASSERT_TRUE(nullValueKeys.count("key2") > 0) << "key2 should have a null value (tombstone)";

// Verify key1 has the latest value (value1-updated overwrites value1)
ASSERT_EQ(keyValues["key1"], "value1-updated") << "key1 should have the updated value";
ASSERT_EQ(keyValues["key3"], "value3") << "key3 should have value3";

producer.close();
reader.close();
client.close();
}

TEST(ReaderTest, testNullValueMessageProperties) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testNullValueMessageProperties-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send a null value message with properties
auto tombstone = MessageBuilder()
.setPartitionKey("user-123")
.setNullValue()
.setProperty("reason", "account-deleted")
.setProperty("deleted-by", "admin")
.build();

ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getPartitionKey(), "user-123");
ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted");
ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin");
ASSERT_EQ(tombstone.getLength(), 0);

ASSERT_EQ(ResultOk, producer.send(tombstone));

// Create a reader and verify the message
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 5000));

// Verify all properties are preserved
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "user-123");
ASSERT_EQ(msg.getProperty("reason"), "account-deleted");
ASSERT_EQ(msg.getProperty("deleted-by"), "admin");
ASSERT_EQ(msg.getLength(), 0);

producer.close();
reader.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));
Loading
Loading