Skip to content

Commit 495c7c0

Browse files
authored
Fix Deadlock risk (#79)
1 parent 442cc55 commit 495c7c0

3 files changed

Lines changed: 93 additions & 31 deletions

File tree

src/rmq/rmqa/rmqa_producerimpl.cpp

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,39 +48,51 @@ void actionConfirmOnThreadPool(
4848
const rmqt::ConfirmResponse& confirmResponse,
4949
const bsl::shared_ptr<ProducerImpl::SharedState>& sharedState)
5050
{
51-
bslmt::LockGuard<bslmt::Mutex> guard(&(sharedState->mutex));
51+
rmqp::Producer::ConfirmationCallback callback;
52+
bsl::optional<rmqt::Future<>::Pair> waitForConfirmsFuture;
5253

53-
if (!(sharedState->isValid)) {
54-
BALL_LOG_ERROR << "Received publisher confirmation for message "
55-
<< message.guid() << " after closing the producer";
56-
return;
57-
}
54+
{
55+
bslmt::LockGuard<bslmt::Mutex> guard(&(sharedState->mutex));
5856

59-
ProducerImpl::CallbackMap::iterator it =
60-
sharedState->callbackMap.find(message.guid());
61-
62-
if (it == sharedState->callbackMap.end()) {
63-
BALL_LOG_FATAL
64-
<< "Failed to find Producer callback to invoke for message: "
65-
<< message.guid()
66-
<< ". Received duplicate confirm? The outstanding "
67-
"message limit will likely be affected for the lifetime of this "
68-
"Producer instance.";
69-
return;
70-
}
57+
if (!(sharedState->isValid)) {
58+
BALL_LOG_ERROR << "Received publisher confirmation for message "
59+
<< message.guid() << " after closing the producer";
60+
return;
61+
}
62+
63+
ProducerImpl::CallbackMap::iterator it =
64+
sharedState->callbackMap.find(message.guid());
65+
66+
if (it == sharedState->callbackMap.end()) {
67+
BALL_LOG_ERROR
68+
<< "Failed to find Producer callback to invoke for message: "
69+
<< message.guid()
70+
<< ". Received duplicate confirm? The outstanding "
71+
"message limit will likely be affected for the lifetime of "
72+
"this "
73+
"Producer instance.";
74+
return;
75+
}
76+
77+
BALL_LOG_TRACE << confirmResponse << " for " << message;
78+
79+
callback.swap(it->second);
7180

72-
BALL_LOG_TRACE << confirmResponse << " for " << message;
81+
sharedState->callbackMap.erase(it);
7382

74-
sharedState->outstandingMessagesCap.post();
83+
sharedState->outstandingMessagesCap.post();
7584

76-
it->second(message, routingKey, confirmResponse);
85+
if (sharedState->callbackMap.size() == 0 &&
86+
sharedState->waitForConfirmsFuture) {
87+
waitForConfirmsFuture = sharedState->waitForConfirmsFuture;
88+
sharedState->waitForConfirmsFuture.reset();
89+
}
90+
}
7791

78-
sharedState->callbackMap.erase(it);
92+
callback(message, routingKey, confirmResponse);
7993

80-
if (sharedState->callbackMap.size() == 0 &&
81-
sharedState->waitForConfirmsFuture) {
82-
sharedState->waitForConfirmsFuture->first(rmqt::Result<>());
83-
sharedState->waitForConfirmsFuture.reset();
94+
if (waitForConfirmsFuture) {
95+
waitForConfirmsFuture->first(rmqt::Result<>());
8496
}
8597
}
8698

@@ -98,7 +110,7 @@ void handleConfirmOnEventLoop(
98110
sharedState));
99111

100112
if (rc != 0) {
101-
BALL_LOG_FATAL
113+
BALL_LOG_ERROR
102114
<< "Couldn't enqueue thread pool job for message confirm: "
103115
<< message.guid() << " (return code " << rc
104116
<< "). Application will NEVER be informed of confirm";

src/rmq/rmqp/rmqp_producer.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ class Producer {
6565
/// completed. For example, an application which consumes from one queue
6666
/// and produces to another should send the acknowledgement to the first
6767
/// queue once the ConfirmationCallback is invoked from the publish.
68+
///
69+
/// Callbacks are invoked on a thread pool thread without holding any
70+
/// internal locks. It is safe to call `send` or `trySend` from within a
71+
/// ConfirmationCallback. Multiple callbacks may execute concurrently on
72+
/// different thread pool threads.
6873
typedef bsl::function<void(const rmqt::Message&,
6974
const bsl::string& routingKey,
7075
const rmqt::ConfirmResponse&)>
@@ -210,13 +215,18 @@ class Producer {
210215

211216
/// \brief Wait for all outstanding publisher confirms to arrive.
212217
///
213-
/// This method allows
218+
/// Blocks the calling thread until every previously sent message has
219+
/// received a publisher confirm from the broker (or the timeout expires).
220+
/// All corresponding ConfirmationCallbacks will have completed before
221+
/// this method returns. Note that messages sent from within a
222+
/// ConfirmationCallback will not be covered by an in-progress
223+
/// `waitForConfirms` call; a subsequent call is needed to wait for those.
214224
///
215225
/// \param timeout How long to wait for. If timeout is 0, the method will
216-
/// wait for confirms indefinitely.
226+
/// wait for confirms indefinitely.
217227
///
218-
/// \return true if all outstanding confirms have arrived.
219-
/// \return false if waiting timed out.
228+
/// \return A default-constructed result if all outstanding confirms have
229+
/// arrived, or an error result if waiting timed out.
220230
virtual rmqt::Result<>
221231
waitForConfirms(const bsls::TimeInterval& timeout) = 0;
222232

src/tests/rmqa/rmqa_producerimpl.t.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,46 @@ TEST_P(ProducerImplUpdateTopology, UpdateCallbackFromTwoThreadsAtOnce)
772772
EXPECT_TRUE(future2.blockResult());
773773
}
774774

775+
TEST_P(ProducerImplMaxOutstandingTests, SendFromConfirmCallbackDoesNotDeadlock)
776+
{
777+
// Regression: calling send() from within a ConfirmationCallback must not
778+
// deadlock even when maxOutstandingConfirms is 1. Before the fix the
779+
// callback was invoked while the internal mutex was held, so re-entering
780+
// send() (which acquires the same mutex) would deadlock.
781+
782+
bsl::shared_ptr<rmqa::ProducerImpl> producer(d_factory->create(
783+
1, d_exchange, d_mockSendChannel, d_threadPool, d_eventLoop));
784+
785+
rmqt::Message msg1 = newMessage();
786+
rmqt::Message msg2 = newMessage();
787+
const rmqt::ConfirmResponse ack(rmqt::ConfirmResponse::ACK);
788+
789+
// Send the first message — this should succeed immediately.
790+
EXPECT_THAT(producer->send(msg1, d_queue->name(), d_callback, d_timeout),
791+
Eq(rmqp::Producer::SENDING));
792+
793+
// When msg1 is confirmed, send msg2 from inside the callback. The
794+
// outstanding slot freed by msg1's confirm must be available before the
795+
// callback executes, otherwise send() would block forever (deadlock with
796+
// the old code).
797+
EXPECT_CALL(*d_mockCallback, onConfirm(msg1, _, ack))
798+
.WillOnce(InvokeWithoutArgs([&]() {
799+
EXPECT_THAT(
800+
producer->send(msg2, d_queue->name(), d_callback, d_timeout),
801+
Eq(rmqp::Producer::SENDING));
802+
}));
803+
804+
d_injectConfirm(msg1, d_queue->name(), ack);
805+
d_threadPool.drain();
806+
807+
d_threadPool.start();
808+
809+
// Now confirm msg2 to leave the producer in a clean state.
810+
EXPECT_CALL(*d_mockCallback, onConfirm(msg2, _, ack));
811+
d_injectConfirm(msg2, d_queue->name(), ack);
812+
d_threadPool.drain();
813+
}
814+
775815
class TracingProducerImplTests : public ProducerImplMaxOutstandingTests {
776816
public:
777817
};

0 commit comments

Comments
 (0)