Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface ProducerConfig {
schema?: SchemaInfo;
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
}

export class Producer {
Expand Down Expand Up @@ -176,6 +177,26 @@ export class MessageId {
toString(): string;
}

/**
* Metadata for a topic that the MessageRouter can use.
*/
export interface TopicMetadata {
numPartitions: number;
}

/**
* A custom message router interface that can be implemented by the user.
*/
export interface MessageRouter {
/**
* Choose a partition for the given message.
* @param message The message to be routed.
* @param topicMetadata Metadata for the topic.
* @returns The partition index to send the message to.
*/
getPartition(message: ProducerMessage, topicMetadata: TopicMetadata): number;
}

export interface SchemaInfo {
schemaType: SchemaType;
name?: string;
Expand Down
7 changes: 6 additions & 1 deletion src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
auto producerConfig = instanceContext->producerConfig;
delete instanceContext;

if (result != pulsar_result_Ok) {
Expand All @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt

std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);

deferred->Resolve([cProducer](const Napi::Env env) {
deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
producer->producerConfig = producerConfig;
return obj;
});
},
Expand All @@ -107,6 +109,9 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
auto cMessage = Message::BuildMessage(info[0].As<Napi::Object>());
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ProducerSendContext(deferred, cMessage);

pulsar_message_set_property()


pulsar_producer_send_async(
this->cProducer.get(), cMessage.get(),
Expand Down
3 changes: 3 additions & 0 deletions src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <napi.h>
#include <pulsar/c/client.h>
#include <pulsar/c/producer.h>
#include <memory>
#include "ProducerConfig.h"

class Producer : public Napi::ObjectWrap<Producer> {
public:
Expand All @@ -35,6 +37,7 @@ class Producer : public Napi::ObjectWrap<Producer> {

private:
std::shared_ptr<pulsar_producer_t> cProducer;
std::shared_ptr<ProducerConfig> producerConfig;
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Expand Down
77 changes: 77 additions & 0 deletions src/ProducerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include "SchemaInfo.h"
#include "ProducerConfig.h"
#include "Message.h"
#include <future>
#include <map>
#include "pulsar/ProducerConfiguration.h"

Expand All @@ -42,6 +44,8 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
static const std::string CFG_ACCESS_MODE = "accessMode";
static const std::string CFG_BATCHING_TYPE = "batchingType";
static const std::string CFG_MESSAGE_ROUTER = "messageRouter";
static const std::string CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD = "getPartition";

struct _pulsar_producer_configuration {
pulsar::ProducerConfiguration conf;
Expand Down Expand Up @@ -82,6 +86,46 @@ static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUC
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
};

struct MessageRouterContext {
Napi::ThreadSafeFunction jsRouterFunction;
};

static int messageRouterTrampoline(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata,
void *ctx) {
printf("test1");
MessageRouterContext *context = static_cast<MessageRouterContext *>(ctx);
int numPartitions = pulsar_topic_metadata_get_num_partitions(topicMetadata);
std::promise<int> promise;
std::future<int> future = promise.get_future();
auto callback = [msg, numPartitions, &promise](Napi::Env env, Napi::Function jsCallback) {
printf("test2");
Napi::Object jsMessage = Message::NewInstance(Napi::Object::New(env),
std::shared_ptr<pulsar_message_t>(msg, [](pulsar_message_t*){}));
Napi::Object jsTopicMetadata = Napi::Object::New(env);
jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));
try {
printf("test3");
Napi::Value result = jsCallback.Call({jsMessage, jsTopicMetadata});
if (result.IsNumber()) {
promise.set_value(result.As<Napi::Number>().Int32Value());
} else {
promise.set_value(numPartitions);
}
} catch (const Napi::Error& e) {
fprintf(stderr, "Error in custom message router: %s\n", e.what());
promise.set_value(numPartitions);
}
};

printf("test3 %d", numPartitions);
napi_status status = context->jsRouterFunction.BlockingCall(callback);
context->jsRouterFunction.Release();
if (status != napi_ok) {
return numPartitions;
}
return future.get();
}

ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
Expand Down Expand Up @@ -224,6 +268,39 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
}

if (producerConfig.Has(CFG_MESSAGE_ROUTER)) {
Napi::Value routerValue = producerConfig.Get(CFG_MESSAGE_ROUTER);
Napi::Function routerFunc;

// Case 1: User passed a function directly, e.g., messageRouter: (msg, meta) => 0
if (routerValue.IsFunction()) {
routerFunc = routerValue.As<Napi::Function>();
}
// Case 2: User passed an object, e.g., messageRouter: { getPartition: (msg, meta) => 0 }
else if (routerValue.IsObject()) {
Napi::Object jsRouter = routerValue.As<Napi::Object>();
if (jsRouter.Has(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD) &&
jsRouter.Get(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD).IsFunction()) {
routerFunc = jsRouter.Get(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD).As<Napi::Function>();
}
}

// If we found a valid function from either case, set it up.
if (routerFunc) {
auto context = new MessageRouterContext();
context->jsRouterFunction =
Napi::ThreadSafeFunction::New(producerConfig.Env(), routerFunc, "MessageRouterCallback", 0, 1);
this->routerContext.reset(context);
pulsar_producer_configuration_set_message_router(
this->cProducerConfig.get(), messageRouterTrampoline, this->routerContext.get());
} else {
Napi::TypeError::New(producerConfig.Env(), "The 'messageRouter' option must be a function, or an "
"object with a 'getPartition' method.")
.ThrowAsJavaScriptException();
return;
}
}
}

ProducerConfig::~ProducerConfig() {}
Expand Down
3 changes: 3 additions & 0 deletions src/ProducerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <napi.h>
#include <pulsar/c/producer_configuration.h>

struct MessageRouterContext;

class ProducerConfig {
public:
ProducerConfig(const Napi::Object &producerConfig);
Expand All @@ -33,6 +35,7 @@ class ProducerConfig {
private:
std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
std::string topic;
std::unique_ptr<MessageRouterContext> routerContext;
};

#endif
77 changes: 77 additions & 0 deletions tests/producer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/

const Pulsar = require('../index');
const httpRequest = require('./http_utils');

const adminUrl = 'http://localhost:8080';

(() => {
describe('Producer', () => {
Expand All @@ -27,6 +30,9 @@ const Pulsar = require('../index');
client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
log: (level, file, line, message) => {
console.log('[%s][%s:%d] %s', Pulsar.LogLevel.toString(level), file, line, message);
},
});
});

Expand Down Expand Up @@ -156,5 +162,76 @@ const Pulsar = require('../index');
await producer2.close();
});
});
describe('Message Routing', () => {
test('Custom Message Router', async () => {
// 1. Define a partitioned topic and a custom router
const targetPartition = 1;
const partitionedTopicName = `test-custom-router-${Date.now()}`;
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
const numPartitions = 10;

// Use admin client to create a partitioned topic. This is more robust.
// Assuming 'adminUrl' and 'httpRequest' are available from your test setup.
const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
const createPartitionedTopicRes = await httpRequest(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'application/json', // Use application/json for REST API
},
data: numPartitions,
method: 'PUT',
},
);
// 204 No Content is success for PUT create
expect(createPartitionedTopicRes.statusCode).toBe(204);

// 2. Create a producer with the custom message router
const producer = await client.createProducer({
topic: partitionedTopic, // Note: For producer, use the base topic name
messageRouter: (message, topicMetadata) => {
console.log(`Custom router called. Total partitions: ${topicMetadata.numPartitions}, return partition: ${targetPartition}`);
// Always route to the target partition for this test
return targetPartition;
},
messageRoutingMode: 'CustomPartition',
});

// 3. Create a single consumer for the entire partitioned topic
const consumer = await client.subscribe({
topic: partitionedTopic,
subscription: 'test-sub',
subscriptionInitialPosition: 'Earliest',
});

// 4. Send 1000 messages in parallel for efficiency
console.log(`Sending messages to partitioned topic ${partitionedTopic}...`);
const numMessages = 1000;
for (let i = 0; i < numMessages; i += 1) {
console.log('before send message');
const msg = `message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log('send message');
}
console.log('Send done.');
await producer.flush();
console.log(`Sent ${numMessages} messages.`);

// 5. Receive messages and assert they all come from the target partition
const receivedMessages = new Set();
const expectedPartitionName = `${partitionedTopic}-partition-${targetPartition}`;

for (let i = 0; i < numMessages; i += 1) {
const msg = await consumer.receive(10000);
expect(msg.getTopicName()).toBe(expectedPartitionName);
receivedMessages.add(msg.getData().toString());
await consumer.acknowledge(msg);
}
// Final assertion to ensure all unique messages were received
expect(receivedMessages.size).toBe(numMessages);
console.log(`Successfully received and verified ${receivedMessages.size} messages from ${expectedPartitionName}.`);
}, 30000);
});
});
})();
Loading