Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getSchemaVersion() const;

/**
* Get the producer name which produced this message.
*
* @return the producer name or empty string if not available
*/
const std::string& getProducerName() const noexcept;

bool operator==(const Message& msg) const;

protected:
Expand Down
6 changes: 6 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ PULSAR_PUBLIC const char *pulsar_message_get_schemaVersion(pulsar_message_t *mes

PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion);

/**
* Returns the producer name which produced this message. The pointer points to an internal string, so the
* caller should not free it.
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

#ifdef __cplusplus
}
#endif
7 changes: 7 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublish

uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }

const std::string& Message::getProducerName() const noexcept {
if (!impl_) {
return emptyString;
}
return impl_->metadata.producer_name();
}

bool Message::operator==(const Message& msg) const { return getMessageId() == msg.getMessageId(); }

KeyValue Message::getKeyValueData() const { return KeyValue(impl_->keyValuePtr); }
Expand Down
4 changes: 4 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,7 @@ const char *pulsar_message_get_schemaVersion(pulsar_message_t *message) {
int pulsar_message_has_schema_version(pulsar_message_t *message) {
return message->message.hasSchemaVersion();
}

const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}
13 changes: 12 additions & 1 deletion tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,20 @@ TEST(BasicEndToEndTest, testProduceConsume) {
Message receivedMsg;
consumer.receive(receivedMsg);
ASSERT_EQ(content, receivedMsg.getDataAsString());
ASSERT_FALSE(receivedMsg.getProducerName().empty());
ASSERT_EQ(ResultOk, producer.close());

ProducerConfiguration conf;
conf.setProducerName("test-producer");
ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
producer.send(MessageBuilder().setContent("msg-2-content").build());
consumer.receive(receivedMsg);
ASSERT_EQ("msg-2-content", receivedMsg.getDataAsString());
ASSERT_EQ("test-producer", receivedMsg.getProducerName());
consumer.acknowledge(receivedMsg);
ASSERT_EQ(ResultOk, consumer.unsubscribe());
ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, client.close());
}

Expand Down
1 change: 1 addition & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ TEST(MessageTest, testMessageContents) {
ASSERT_NE(myContents.c_str(), (char*)msg.getData());
ASSERT_EQ(myContents, msg.getDataAsString());
ASSERT_EQ(std::string("mycontents").length(), msg.getLength());
ASSERT_TRUE(msg.getProducerName().empty());
}

TEST(MessageTest, testAllocatedContents) {
Expand Down
11 changes: 9 additions & 2 deletions tests/c/c_BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct receive_ctx {
pulsar_result result;
pulsar_consumer_t *consumer;
char *data;
char *producer_name;
std::promise<void> *promise;
};

Expand All @@ -57,6 +58,9 @@ static void receive_callback(pulsar_result async_result, pulsar_message_t *msg,
const char *data = (const char *)pulsar_message_get_data(msg);
receive_ctx->data = (char *)malloc(strlen(data) * sizeof(char) + 1);
strcpy(receive_ctx->data, data);
const char *producer_name = pulsar_message_get_producer_name(msg);
receive_ctx->producer_name = (char *)malloc(strlen(producer_name) * sizeof(char) + 1);
strcpy(receive_ctx->producer_name, producer_name);
}
receive_ctx->promise->set_value();
pulsar_message_free(msg);
Expand All @@ -71,6 +75,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);

pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create();
pulsar_producer_configuration_set_producer_name(producer_conf, "test-producer");
pulsar_producer_t *producer;
pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer);
ASSERT_EQ(pulsar_result_Ok, result);
Expand Down Expand Up @@ -101,12 +106,14 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
// receive asynchronously
std::promise<void> receive_promise;
std::future<void> receive_future = receive_promise.get_future();
struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, NULL, &receive_promise};
struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, NULL, NULL, &receive_promise};
pulsar_consumer_receive_async(consumer, receive_callback, &receive_ctx);
receive_future.get();
ASSERT_EQ(pulsar_result_Ok, receive_ctx.result);
ASSERT_STREQ("test-producer", receive_ctx.producer_name);
ASSERT_STREQ(content, receive_ctx.data);
delete receive_ctx.data;
free(receive_ctx.data);
free(receive_ctx.producer_name);

ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));
Expand Down
Loading