Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 39 additions & 27 deletions src/rmq/rmqa/rmqa_producerimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,51 @@ void actionConfirmOnThreadPool(
const rmqt::ConfirmResponse& confirmResponse,
const bsl::shared_ptr<ProducerImpl::SharedState>& sharedState)
{
bslmt::LockGuard<bslmt::Mutex> guard(&(sharedState->mutex));
rmqp::Producer::ConfirmationCallback callback;
bsl::optional<rmqt::Future<>::Pair> waitForConfirmsFuture;

if (!(sharedState->isValid)) {
BALL_LOG_ERROR << "Received publisher confirmation for message "
<< message.guid() << " after closing the producer";
return;
}
{
bslmt::LockGuard<bslmt::Mutex> guard(&(sharedState->mutex));

ProducerImpl::CallbackMap::iterator it =
sharedState->callbackMap.find(message.guid());

if (it == sharedState->callbackMap.end()) {
BALL_LOG_FATAL
<< "Failed to find Producer callback to invoke for message: "
<< message.guid()
<< ". Received duplicate confirm? The outstanding "
"message limit will likely be affected for the lifetime of this "
"Producer instance.";
return;
}
if (!(sharedState->isValid)) {
BALL_LOG_ERROR << "Received publisher confirmation for message "
<< message.guid() << " after closing the producer";
return;
}

ProducerImpl::CallbackMap::iterator it =
sharedState->callbackMap.find(message.guid());

if (it == sharedState->callbackMap.end()) {
BALL_LOG_ERROR
<< "Failed to find Producer callback to invoke for message: "
<< message.guid()
<< ". Received duplicate confirm? The outstanding "
"message limit will likely be affected for the lifetime of "
"this "
"Producer instance.";
return;
}

BALL_LOG_TRACE << confirmResponse << " for " << message;

callback.swap(it->second);

BALL_LOG_TRACE << confirmResponse << " for " << message;
sharedState->callbackMap.erase(it);

sharedState->outstandingMessagesCap.post();
sharedState->outstandingMessagesCap.post();

it->second(message, routingKey, confirmResponse);
if (sharedState->callbackMap.size() == 0 &&
sharedState->waitForConfirmsFuture) {
waitForConfirmsFuture = sharedState->waitForConfirmsFuture;
sharedState->waitForConfirmsFuture.reset();
}
}

sharedState->callbackMap.erase(it);
callback(message, routingKey, confirmResponse);

if (sharedState->callbackMap.size() == 0 &&
sharedState->waitForConfirmsFuture) {
sharedState->waitForConfirmsFuture->first(rmqt::Result<>());
sharedState->waitForConfirmsFuture.reset();
if (waitForConfirmsFuture) {
waitForConfirmsFuture->first(rmqt::Result<>());
}
}

Expand All @@ -98,7 +110,7 @@ void handleConfirmOnEventLoop(
sharedState));

if (rc != 0) {
BALL_LOG_FATAL
BALL_LOG_ERROR
<< "Couldn't enqueue thread pool job for message confirm: "
<< message.guid() << " (return code " << rc
<< "). Application will NEVER be informed of confirm";
Expand Down
18 changes: 14 additions & 4 deletions src/rmq/rmqp/rmqp_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class Producer {
/// completed. For example, an application which consumes from one queue
/// and produces to another should send the acknowledgement to the first
/// queue once the ConfirmationCallback is invoked from the publish.
///
/// Callbacks are invoked on a thread pool thread without holding any
/// internal locks. It is safe to call `send` or `trySend` from within a
/// ConfirmationCallback. Multiple callbacks may execute concurrently on
/// different thread pool threads.
typedef bsl::function<void(const rmqt::Message&,
const bsl::string& routingKey,
const rmqt::ConfirmResponse&)>
Expand Down Expand Up @@ -210,13 +215,18 @@ class Producer {

/// \brief Wait for all outstanding publisher confirms to arrive.
///
/// This method allows
/// Blocks the calling thread until every previously sent message has
/// received a publisher confirm from the broker (or the timeout expires).
/// All corresponding ConfirmationCallbacks will have completed before
/// this method returns. Note that messages sent from within a
/// ConfirmationCallback will not be covered by an in-progress
/// `waitForConfirms` call; a subsequent call is needed to wait for those.
///
/// \param timeout How long to wait for. If timeout is 0, the method will
/// wait for confirms indefinitely.
/// wait for confirms indefinitely.
///
/// \return true if all outstanding confirms have arrived.
/// \return false if waiting timed out.
/// \return A default-constructed result if all outstanding confirms have
/// arrived, or an error result if waiting timed out.
virtual rmqt::Result<>
waitForConfirms(const bsls::TimeInterval& timeout) = 0;

Expand Down
40 changes: 40 additions & 0 deletions src/tests/rmqa/rmqa_producerimpl.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,46 @@ TEST_P(ProducerImplUpdateTopology, UpdateCallbackFromTwoThreadsAtOnce)
EXPECT_TRUE(future2.blockResult());
}

TEST_P(ProducerImplMaxOutstandingTests, SendFromConfirmCallbackDoesNotDeadlock)
{
// Regression: calling send() from within a ConfirmationCallback must not
// deadlock even when maxOutstandingConfirms is 1. Before the fix the
// callback was invoked while the internal mutex was held, so re-entering
// send() (which acquires the same mutex) would deadlock.

bsl::shared_ptr<rmqa::ProducerImpl> producer(d_factory->create(
1, d_exchange, d_mockSendChannel, d_threadPool, d_eventLoop));

rmqt::Message msg1 = newMessage();
rmqt::Message msg2 = newMessage();
const rmqt::ConfirmResponse ack(rmqt::ConfirmResponse::ACK);

// Send the first message — this should succeed immediately.
EXPECT_THAT(producer->send(msg1, d_queue->name(), d_callback, d_timeout),
Eq(rmqp::Producer::SENDING));

// When msg1 is confirmed, send msg2 from inside the callback. The
// outstanding slot freed by msg1's confirm must be available before the
// callback executes, otherwise send() would block forever (deadlock with
// the old code).
EXPECT_CALL(*d_mockCallback, onConfirm(msg1, _, ack))
.WillOnce(InvokeWithoutArgs([&]() {
EXPECT_THAT(
producer->send(msg2, d_queue->name(), d_callback, d_timeout),
Eq(rmqp::Producer::SENDING));
}));

d_injectConfirm(msg1, d_queue->name(), ack);
d_threadPool.drain();

d_threadPool.start();

// Now confirm msg2 to leave the producer in a clean state.
EXPECT_CALL(*d_mockCallback, onConfirm(msg2, _, ack));
d_injectConfirm(msg2, d_queue->name(), ack);
d_threadPool.drain();
}

class TracingProducerImplTests : public ProducerImplMaxOutstandingTests {
public:
};
Expand Down
Loading