Skip to content

[fix][broker] Prevent timed-out producer creation from racing with retry#25460

Merged
merlimat merged 1 commit intoapache:masterfrom
Denovo1998:fix_timed_out_producer_creation_from_racing_with_retry
Apr 3, 2026
Merged

[fix][broker] Prevent timed-out producer creation from racing with retry#25460
merlimat merged 1 commit intoapache:masterfrom
Denovo1998:fix_timed_out_producer_creation_from_racing_with_retry

Conversation

@Denovo1998
Copy link
Copy Markdown
Contributor

https://github.com/Denovo1998/pulsar/actions/runs/23939130687/job/69821936858

org.apache.pulsar.broker.service.ServerCnxTest#testCreateProducerTimeout

2026-04-03T17:42:26,799 - INFO  - [TestNG-method=testCreateProducerTimeout-1:ServerCnx] - [embedded] Closed producer before its creation was completed. producerId=1
2026-04-03T17:42:26,800 - INFO  - [metadata-store-worker-OrderedExecutor-0-0:BrokerService] - [persistent://prop/ns-abc/successTopic] Finished loading from other concurrent loading task (latency: 0 ms)
2026-04-03T17:42:26,803 - WARN  - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - [persistent://prop/ns-abc/successTopic] No replication clusters configured
2026-04-03T17:42:26,803 - INFO  - [broker-topic-workers-OrderedExecutor-2-0:BrokerService] - Created topic persistent://prop/ns-abc/successTopic - dedup is disabled (latency: 3 ms)
2026-04-03T17:42:26,817 - INFO  - [TestNG-method=testCreateProducerTimeout-1:ServerCnx] - [embedded] Cleared producer created after timeout on client side Producer{topic=PersistentTopic{topic=persistent://prop/ns-abc/successTopic}, client=[id: 0xembedded, L:embedded - R:embedded] [SR:-, state:Connected], producerName=my-producer, producerId=1}
2026-04-03T17:42:26,817 - WARN  - [TestNG-method=testCreateProducerTimeout-1:ServerCnx] - [embedded] Failed to add producer to topic persistent://prop/ns-abc/successTopic: producerId=1, Producer with name 'my-producer' is already connected to topic 'persistent://prop/ns-abc/successTopic'
2026-04-03T17:42:26,817 - ERROR - [TestNG-method=testCreateProducerTimeout-1:ServerCnxTest] - org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'my-producer' is already connected to topic 'persistent://prop/ns-abc/successTopic'

Expected :class org.apache.pulsar.common.api.proto.CommandProducerSuccess
Actual   :class org.apache.pulsar.common.api.proto.CommandError
<Click to see difference>

java.lang.AssertionError: expected [class org.apache.pulsar.common.api.proto.CommandProducerSuccess] but found [class org.apache.pulsar.common.api.proto.CommandError]
	at org.testng.Assert.fail(Assert.java:110)
	at org.testng.Assert.failNotEquals(Assert.java:1577)
	at org.testng.Assert.assertEqualsImpl(Assert.java:149)
	at org.testng.Assert.assertEquals(Assert.java:131)
	at org.testng.Assert.assertEquals(Assert.java:643)
	at org.apache.pulsar.broker.service.ServerCnxTest.testCreateProducerTimeout(ServerCnxTest.java:2099)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Motivation

PR #25352 changed several ServerCnx producer creation callbacks to run asynchronously on ctx.executor(). That change fixed one class of async callback races, but it also introduced a new race in the client-timeout / retry path for producer creation.

When a client-side createProducer request times out, the client can immediately send a CloseProducer followed by another createProducer request with the same producer id and producer name. After handleCloseProducer() completes the first producerFuture exceptionally, the delayed async callbacks introduced by #25352 can still continue and eventually invoke buildProducerAndAddTopic() for that stale request.

Before this change, the stale request could still call topic.addProducer() and temporarily register a producer on the topic. If the retry request reached topic.addProducer() in the same window, it could fail with NamingException: Producer with name ... is already connected, even though the original request had already timed out on the client side.

Time Thread timed-out create #1 Thread retry create #2 State/Comments
1 handleProducer() stores producerFuture for producer id 1 The first create request starts before topic opening completes.
2 Topic opening is still pending, so no producer has been added to the topic yet.
3 handleCloseProducer() completes the first producerFuture exceptionally The client-side timeout is translated into a server-side close-before-create-completes path.
4 handleProducer() receives a retry with the same producer id and producer name The retry removes the failed future from the connection map and starts a new create flow.
5 The delayed async callbacks introduced by #25352 continue and reach buildProducerAndAddTopic() The retry create is also progressing toward buildProducerAndAddTopic() Both flows can now run on ctx.executor().
6 Before this fix, the stale request still executed topic.addProducer() The stale producer could be inserted into AbstractTopic.producers even though the client had already timed out.
7 The retry executes topic.addProducer() If the stale request won the insertion race, the retry observed the existing producer name.
8 topic.addProducer() fails with NamingException The retry fails with Producer with name ... is already connected.
9 The stale request later notices its producerFuture was already failed and clears itself Cleanup happens too late to help the retry that already failed.
10 Final issue: producer recreation after timeout becomes flaky and the wrong request can temporarily own the topic producer slot.

Modifications

  • Add an early guard in ServerCnx.buildProducerAndAddTopic() to skip producer construction and topic registration when the corresponding producerFuture has already been completed exceptionally by the timeout/close path.
  • Remove the stale producer future from the connection map before returning, so the abandoned request cannot continue to compete with the retrying request for the same producer id.
  • Keep the existing duplicate-name behavior for real active producers unchanged; only stale timed-out create requests are ignored before topic.addProducer() is reached.
  • Verify the fix with ServerCnxTest.testCreateProducerTimeout and ServerCnxTest.testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail.

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Denovo1998#26

@merlimat merlimat merged commit 56442c2 into apache:master Apr 3, 2026
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants