Skip to content

Commit 91aa2ba

Browse files
committed
Add new APIs to carry error messages when creating producer, consumer or reader
1 parent 7d1002a commit 91aa2ba

23 files changed

Lines changed: 589 additions & 22 deletions

include/pulsar/Client.h

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,15 @@
3636

3737
#include <memory>
3838
#include <string>
39+
#include <variant>
3940

4041
namespace pulsar {
4142
typedef std::function<void(Result, Producer)> CreateProducerCallback;
43+
typedef std::function<void(std::variant<Producer, Error>)> CreateProducerCallbackV2;
4244
typedef std::function<void(Result, Consumer)> SubscribeCallback;
45+
typedef std::function<void(std::variant<Consumer, Error>)> SubscribeCallbackV2;
4346
typedef std::function<void(Result, Reader)> ReaderCallback;
47+
typedef std::function<void(std::variant<Reader, Error>)> ReaderCallbackV2;
4448
typedef std::function<void(Result, TableView)> TableViewCallback;
4549
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
4650
typedef std::function<void(Result)> CloseCallback;
@@ -108,7 +112,9 @@ class PULSAR_PUBLIC Client {
108112
* @return ResultOk if the producer has been successfully created
109113
* @return ResultError if there was an error
110114
*/
111-
Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer);
115+
[[deprecated("use createProducerV2 instead")]] Result createProducer(const std::string& topic,
116+
const ProducerConfiguration& conf,
117+
Producer& producer);
112118

113119
/**
114120
* Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific
@@ -118,7 +124,18 @@ class PULSAR_PUBLIC Client {
118124
* @param callback the callback that is triggered when the producer is created successfully or not
119125
* @param callback Callback function that is invoked when the operation is completed
120126
*/
121-
void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback);
127+
[[deprecated("use createProducerAsyncV2 instead")]] void createProducerAsync(
128+
const std::string& topic, const CreateProducerCallback& callback);
129+
130+
std::variant<Producer, Error> createProducerV2(const std::string& topic);
131+
132+
std::variant<Producer, Error> createProducerV2(const std::string& topic,
133+
const ProducerConfiguration& conf);
134+
135+
void createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback);
136+
137+
void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
138+
const CreateProducerCallbackV2& callback);
122139

123140
/**
124141
* Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific
@@ -151,6 +168,11 @@ class PULSAR_PUBLIC Client {
151168
Result subscribe(const std::string& topic, const std::string& subscriptionName,
152169
const ConsumerConfiguration& conf, Consumer& consumer);
153170

171+
std::variant<Consumer, Error> subscribeV2(const std::string& topic, const std::string& subscriptionName);
172+
173+
std::variant<Consumer, Error> subscribeV2(const std::string& topic, const std::string& subscriptionName,
174+
const ConsumerConfiguration& conf);
175+
154176
/**
155177
* Asynchronously subscribe to a given topic and subscription combination with the default
156178
* ConsumerConfiguration
@@ -163,6 +185,9 @@ class PULSAR_PUBLIC Client {
163185
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
164186
const SubscribeCallback& callback);
165187

188+
void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
189+
const SubscribeCallbackV2& callback);
190+
166191
/**
167192
* Asynchronously subscribe to a given topic and subscription combination with the customized
168193
* ConsumerConfiguration
@@ -176,6 +201,9 @@ class PULSAR_PUBLIC Client {
176201
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
177202
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
178203

204+
void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
205+
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback);
206+
179207
/**
180208
* Subscribe to multiple topics under the same namespace.
181209
*
@@ -197,6 +225,13 @@ class PULSAR_PUBLIC Client {
197225
Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
198226
const ConsumerConfiguration& conf, Consumer& consumer);
199227

228+
std::variant<Consumer, Error> subscribeV2(const std::vector<std::string>& topics,
229+
const std::string& subscriptionName);
230+
231+
std::variant<Consumer, Error> subscribeV2(const std::vector<std::string>& topics,
232+
const std::string& subscriptionName,
233+
const ConsumerConfiguration& conf);
234+
200235
/**
201236
* Asynchronously subscribe to a list of topics and subscription combination using the default
202237
ConsumerConfiguration
@@ -210,6 +245,9 @@ class PULSAR_PUBLIC Client {
210245
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
211246
const SubscribeCallback& callback);
212247

248+
void subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
249+
const SubscribeCallbackV2& callback);
250+
213251
/**
214252
* Asynchronously subscribe to a list of topics and subscription combination using the customized
215253
* ConsumerConfiguration
@@ -223,6 +261,9 @@ class PULSAR_PUBLIC Client {
223261
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
224262
const ConsumerConfiguration& conf, const SubscribeCallback& callback);
225263

264+
void subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
265+
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback);
266+
226267
/**
227268
* Subscribe to multiple topics, which match given regexPattern, under the same namespace.
228269
*/
@@ -291,6 +332,9 @@ class PULSAR_PUBLIC Client {
291332
Result createReader(const std::string& topic, const MessageId& startMessageId,
292333
const ReaderConfiguration& conf, Reader& reader);
293334

335+
std::variant<Reader, Error> createReaderV2(const std::string& topic, const MessageId& startMessageId,
336+
const ReaderConfiguration& conf);
337+
294338
/**
295339
* Asynchronously create a topic reader with the customized ReaderConfiguration for reading messages from
296340
* the specified topic.
@@ -320,6 +364,9 @@ class PULSAR_PUBLIC Client {
320364
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
321365
const ReaderConfiguration& conf, const ReaderCallback& callback);
322366

367+
void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
368+
const ReaderConfiguration& conf, const ReaderCallbackV2& callback);
369+
323370
/**
324371
* Create a table view with given {@code TableViewConfiguration} for specified topic.
325372
*

include/pulsar/Result.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include <cstdint>
2525
#include <iosfwd>
26+
#include <string>
2627

2728
namespace pulsar {
2829

@@ -101,6 +102,12 @@ enum Result : int8_t
101102
PULSAR_PUBLIC const char* strResult(Result result);
102103

103104
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);
105+
106+
struct Error {
107+
Result result;
108+
std::string message;
109+
};
110+
104111
} // namespace pulsar
105112

106113
#endif /* ERROR_HPP_ */

lib/Client.cc

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,31 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC
6363
createProducerAsync(topic, ProducerConfiguration(), callback);
6464
}
6565

66+
std::variant<Producer, Error> Client::createProducerV2(const std::string& topic) {
67+
return createProducerV2(topic, ProducerConfiguration());
68+
}
69+
70+
std::variant<Producer, Error> Client::createProducerV2(const std::string& topic,
71+
const ProducerConfiguration& conf) {
72+
Promise<bool, std::variant<Producer, Error> > promise;
73+
createProducerAsyncV2(topic, conf,
74+
[promise](std::variant<Producer, Error> result) { promise.setValue(result); });
75+
Future<bool, std::variant<Producer, Error> > future = promise.getFuture();
76+
77+
std::variant<Producer, Error> result;
78+
future.get(result);
79+
return result;
80+
}
81+
82+
void Client::createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback) {
83+
createProducerAsyncV2(topic, ProducerConfiguration(), callback);
84+
}
85+
86+
void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
87+
const CreateProducerCallbackV2& callback) {
88+
impl_->createProducerAsyncV2(topic, conf, callback);
89+
}
90+
6691
void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
6792
const CreateProducerCallback& callback) {
6893
impl_->createProducerAsync(topic, conf, callback);
@@ -81,17 +106,46 @@ Result Client::subscribe(const std::string& topic, const std::string& subscripti
81106
return future.get(consumer);
82107
}
83108

109+
std::variant<Consumer, Error> Client::subscribeV2(const std::string& topic,
110+
const std::string& subscriptionName) {
111+
return subscribeV2(topic, subscriptionName, ConsumerConfiguration());
112+
}
113+
114+
std::variant<Consumer, Error> Client::subscribeV2(const std::string& topic,
115+
const std::string& subscriptionName,
116+
const ConsumerConfiguration& conf) {
117+
Promise<bool, std::variant<Consumer, Error> > promise;
118+
subscribeAsyncV2(topic, subscriptionName, conf,
119+
[promise](std::variant<Consumer, Error> result) { promise.setValue(result); });
120+
Future<bool, std::variant<Consumer, Error> > future = promise.getFuture();
121+
122+
std::variant<Consumer, Error> result;
123+
future.get(result);
124+
return result;
125+
}
126+
84127
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
85128
const SubscribeCallback& callback) {
86129
subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback);
87130
}
88131

132+
void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
133+
const SubscribeCallbackV2& callback) {
134+
subscribeAsyncV2(topic, subscriptionName, ConsumerConfiguration(), callback);
135+
}
136+
89137
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
90138
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
91139
LOG_INFO("Subscribing on Topic :" << topic);
92140
impl_->subscribeAsync(topic, subscriptionName, conf, callback);
93141
}
94142

143+
void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
144+
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) {
145+
LOG_INFO("Subscribing on Topic :" << topic);
146+
impl_->subscribeAsyncV2(topic, subscriptionName, conf, callback);
147+
}
148+
95149
Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
96150
Consumer& consumer) {
97151
return subscribe(topics, subscriptionName, ConsumerConfiguration(), consumer);
@@ -106,16 +160,44 @@ Result Client::subscribe(const std::vector<std::string>& topics, const std::stri
106160
return future.get(consumer);
107161
}
108162

163+
std::variant<Consumer, Error> Client::subscribeV2(const std::vector<std::string>& topics,
164+
const std::string& subscriptionName) {
165+
return subscribeV2(topics, subscriptionName, ConsumerConfiguration());
166+
}
167+
168+
std::variant<Consumer, Error> Client::subscribeV2(const std::vector<std::string>& topics,
169+
const std::string& subscriptionName,
170+
const ConsumerConfiguration& conf) {
171+
Promise<bool, std::variant<Consumer, Error> > promise;
172+
subscribeAsyncV2(topics, subscriptionName, conf,
173+
[promise](std::variant<Consumer, Error> result) { promise.setValue(result); });
174+
Future<bool, std::variant<Consumer, Error> > future = promise.getFuture();
175+
176+
std::variant<Consumer, Error> result;
177+
future.get(result);
178+
return result;
179+
}
180+
109181
void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
110182
const SubscribeCallback& callback) {
111183
subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback);
112184
}
113185

186+
void Client::subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
187+
const SubscribeCallbackV2& callback) {
188+
subscribeAsyncV2(topics, subscriptionName, ConsumerConfiguration(), callback);
189+
}
190+
114191
void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
115192
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
116193
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
117194
}
118195

196+
void Client::subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
197+
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) {
198+
impl_->subscribeAsyncV2(topics, subscriptionName, conf, callback);
199+
}
200+
119201
Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
120202
Consumer& consumer) {
121203
return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer);
@@ -149,11 +231,28 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess
149231
return future.get(reader);
150232
}
151233

234+
std::variant<Reader, Error> Client::createReaderV2(const std::string& topic, const MessageId& startMessageId,
235+
const ReaderConfiguration& conf) {
236+
Promise<bool, std::variant<Reader, Error> > promise;
237+
createReaderAsyncV2(topic, startMessageId, conf,
238+
[promise](std::variant<Reader, Error> result) { promise.setValue(result); });
239+
Future<bool, std::variant<Reader, Error> > future = promise.getFuture();
240+
241+
std::variant<Reader, Error> result;
242+
future.get(result);
243+
return result;
244+
}
245+
152246
void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
153247
const ReaderConfiguration& conf, const ReaderCallback& callback) {
154248
impl_->createReaderAsync(topic, startMessageId, conf, callback);
155249
}
156250

251+
void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
252+
const ReaderConfiguration& conf, const ReaderCallbackV2& callback) {
253+
impl_->createReaderAsyncV2(topic, startMessageId, conf, callback);
254+
}
255+
157256
Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf,
158257
TableView& tableView) {
159258
Promise<Result, TableView> promise;

lib/ClientConnection.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1711,7 +1711,11 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17111711
pendingRequests_.erase(it);
17121712
lock.unlock();
17131713

1714-
request->fail(result);
1714+
ResponseData data;
1715+
if (error.has_message()) {
1716+
data.errorMessage = error.message();
1717+
}
1718+
request->fail(result, data);
17151719
} else {
17161720
auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
17171721
if (it != pendingGetLastMessageIdRequests_.end()) {

lib/ClientConnection.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,10 @@ class CommandSuccess;
113113
// Data returned on the request operation. Mostly used on create-producer command
114114
struct ResponseData {
115115
std::string producerName;
116-
int64_t lastSequenceId;
116+
int64_t lastSequenceId = -1L;
117117
std::string schemaVersion;
118118
optional<uint64_t> topicEpoch;
119+
std::string errorMessage;
119120
};
120121

121122
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;

0 commit comments

Comments
 (0)