Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 0 additions & 141 deletions lib/AckGroupingTracker.cc

This file was deleted.

40 changes: 15 additions & 25 deletions lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@
#include <pulsar/MessageId.h>
#include <pulsar/Result.h>

#include <cstdint>
#include <functional>
#include <set>

#include "ProtoApiEnums.h"

namespace pulsar {

class ClientConnection;
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
using ResultCallback = std::function<void(Result)>;
class ConsumerImpl;
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
using ConsumerImplWeakPtr = std::weak_ptr<ConsumerImpl>;

/**
* @class AckGroupingTracker
Expand All @@ -42,19 +41,12 @@ using ResultCallback = std::function<void(Result)>;
*/
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
public:
AckGroupingTracker(std::function<ClientConnectionPtr()> connectionSupplier,
std::function<uint64_t()> requestIdSupplier, uint64_t consumerId, bool waitResponse)
: connectionSupplier_(std::move(connectionSupplier)),
requestIdSupplier_(std::move(requestIdSupplier)),
consumerId_(consumerId),
waitResponse_(waitResponse) {}

virtual ~AckGroupingTracker() = default;

/**
* Start tracking the ACK requests.
*/
virtual void start() {}
virtual void start(const ConsumerImplPtr& consumer) { consumer_ = consumer; }

/**
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
Expand All @@ -72,7 +64,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
* @param[in] callback the callback that is triggered when the message is acknowledged
*/
virtual void addAcknowledge(const MessageId& msgId, const ResultCallback& callback) {
callback(ResultOk);
if (callback) {
callback(ResultOk);
}
}

/**
Expand All @@ -81,7 +75,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
* @param[in] callback the callback that is triggered when the messages are acknowledged
*/
virtual void addAcknowledgeList(const MessageIdList& msgIds, const ResultCallback& callback) {
callback(ResultOk);
if (callback) {
callback(ResultOk);
}
}

/**
Expand All @@ -90,7 +86,9 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
* @param[in] callback the callback that is triggered when the message is acknowledged
*/
virtual void addAcknowledgeCumulative(const MessageId& msgId, const ResultCallback& callback) {
callback(ResultOk);
if (callback) {
callback(ResultOk);
}
}

/**
Expand All @@ -99,18 +97,10 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
*/
virtual void flushAndClean() {}

protected:
void doImmediateAck(const MessageId& msgId, const ResultCallback& callback,
CommandAck_AckType ackType) const;
void doImmediateAck(const std::set<MessageId>& msgIds, const ResultCallback& callback) const;

private:
const std::function<ClientConnectionPtr()> connectionSupplier_;
const std::function<uint64_t()> requestIdSupplier_;
const uint64_t consumerId_;
virtual void close() {}

protected:
const bool waitResponse_;
ConsumerImplWeakPtr consumer_;

}; // class AckGroupingTracker

Expand Down
29 changes: 22 additions & 7 deletions lib/AckGroupingTrackerDisabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,41 @@

#include "AckGroupingTrackerDisabled.h"

#include "ProtoApiEnums.h"
#include "ConsumerImpl.h"

namespace pulsar {

void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, const ResultCallback& callback) {
doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
auto consumer = consumer_.lock();
if (consumer && !consumer->isClosingOrClosed()) {
consumer->doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
} else if (callback) {
callback(ResultAlreadyClosed);
}
}

void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds,
const ResultCallback& callback) {
std::set<MessageId> msgIdSet;
for (auto&& msgId : msgIds) {
msgIdSet.emplace(msgId);
auto consumer = consumer_.lock();
if (consumer && !consumer->isClosingOrClosed()) {
std::set<MessageId> uniqueMsgIds(msgIds.begin(), msgIds.end());
for (auto&& msgId : msgIds) {
uniqueMsgIds.insert(msgId);
}
consumer->doImmediateAck(uniqueMsgIds, callback);
} else if (callback) {
callback(ResultAlreadyClosed);
}
doImmediateAck(msgIdSet, callback);
}

void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId,
const ResultCallback& callback) {
doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
auto consumer = consumer_.lock();
if (consumer && !consumer->isClosingOrClosed()) {
consumer->doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
} else if (callback) {
callback(ResultAlreadyClosed);
}
}

} // namespace pulsar
Loading