-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] Cache publish metadata and avoid repeated parsing in the broker send path #25555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -208,8 +208,13 @@ public boolean isSuccessorTo(Producer other) { | |
|
|
||
| public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize, | ||
| boolean isChunked, boolean isMarker, Position position) { | ||
| if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) { | ||
| publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position); | ||
| MessagePublishContext messagePublishContext = | ||
| MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), | ||
| batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); | ||
| if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, position, messagePublishContext)) { | ||
| publishMessageToTopic(headersAndPayload, messagePublishContext); | ||
| } else { | ||
| messagePublishContext.recycle(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -223,14 +228,18 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS | |
| }); | ||
| return; | ||
| } | ||
| if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) { | ||
| publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, | ||
| isMarker, position); | ||
| MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, | ||
| highestSequenceId, headersAndPayload.readableBytes(), batchSize, | ||
| isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); | ||
| if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, position, messagePublishContext)) { | ||
| publishMessageToTopic(headersAndPayload, messagePublishContext); | ||
| } else { | ||
| messagePublishContext.recycle(); | ||
| } | ||
| } | ||
|
|
||
| public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize, | ||
| Position position) { | ||
| public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, | ||
| Position position, PublishContext publishContext) { | ||
|
dao-jun marked this conversation as resolved.
|
||
| if (!isShadowTopic && position != null) { | ||
| cnx.execute(() -> { | ||
| cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, | ||
|
|
@@ -266,10 +275,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he | |
| } | ||
|
|
||
| if (topic.isEncryptionRequired()) { | ||
|
|
||
| headersAndPayload.markReaderIndex(); | ||
| MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); | ||
| headersAndPayload.resetReaderIndex(); | ||
| MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload); | ||
| int encryptionKeysCount = msgMetadata.getEncryptionKeysCount(); | ||
| // Check whether the message is encrypted or not | ||
| if (encryptionKeysCount < 1) { | ||
|
|
@@ -283,7 +289,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he | |
| } | ||
| } | ||
|
|
||
| startPublishOperation((int) batchSize, headersAndPayload.readableBytes()); | ||
| startPublishOperation((int) publishContext.getNumberOfMessages(), headersAndPayload.readableBytes()); | ||
| return true; | ||
| } | ||
|
|
||
|
|
@@ -292,26 +298,9 @@ private boolean isSupportsReplDedupByLidAndEid() { | |
| return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent(); | ||
| } | ||
|
|
||
| private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked, | ||
| boolean isMarker, Position position) { | ||
| MessagePublishContext messagePublishContext = | ||
| MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), | ||
| batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); | ||
| if (brokerInterceptor != null) { | ||
| brokerInterceptor | ||
| .onMessagePublish(this, headersAndPayload, messagePublishContext); | ||
| } | ||
| topic.publishMessage(headersAndPayload, messagePublishContext); | ||
| } | ||
|
|
||
| private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, | ||
| int batchSize, boolean isChunked, boolean isMarker, Position position) { | ||
| MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, | ||
| highestSequenceId, headersAndPayload.readableBytes(), batchSize, | ||
| isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); | ||
| private void publishMessageToTopic(ByteBuf headersAndPayload, MessagePublishContext messagePublishContext) { | ||
| if (brokerInterceptor != null) { | ||
| brokerInterceptor | ||
| .onMessagePublish(this, headersAndPayload, messagePublishContext); | ||
| brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR body says the cached metadata is invalidated after BrokerInterceptor.onMessagePublish(...), but I don't see that in the final diff. This changes behavior when metadata has already been cached before the interceptor runs. For example, on an encryption-required topic, checkAndStartPublish(...) calls publishContext.getMessageMetadata(...). If an interceptor then mutates the message metadata in headersAndPayload, PersistentTopic.isExceedMaximumDeliveryDelay(...) and MessageDeduplication will reuse the pre-interceptor cached metadata. In current master those later checks reparse headersAndPayload after the interceptor, so they observe the post-interceptor metadata. Could we add a MessagePublishContext#clearMessageMetadata()/invalidateMessageMetadata() and call it immediately after BrokerInterceptor.onMessagePublish(...) in both the normal publish path and publishTxnMessage? If multiple interceptors are executed in series and the first interceptor modifies metadata, and the second interceptor reads metadata through publishContext.getMessageMetadata(...), then clearing the cache only after the entire BrokerInterceptors returns cannot solve the visibility within the interceptor chain? |
||
| } | ||
| topic.publishMessage(headersAndPayload, messagePublishContext); | ||
| } | ||
|
|
@@ -412,6 +401,7 @@ private static final class MessagePublishContext implements PublishContext, Runn | |
| private long originalHighestSequenceId; | ||
|
|
||
| private long entryTimestamp; | ||
| private MessageMetadata messageMetadata; | ||
|
|
||
| @Override | ||
| public long getLedgerId() { | ||
|
|
@@ -467,6 +457,15 @@ public long getHighestSequenceId() { | |
| return highestSequenceId; | ||
| } | ||
|
|
||
| @Override | ||
| public MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) { | ||
| if (messageMetadata == null) { | ||
| messageMetadata = new MessageMetadata(); | ||
| Commands.peekMessageMetadata(headersAndPayload, messageMetadata); | ||
| } | ||
| return messageMetadata; | ||
| } | ||
|
|
||
| @Override | ||
| public void setOriginalProducerName(String originalProducerName) { | ||
| this.originalProducerName = originalProducerName; | ||
|
|
@@ -629,11 +628,14 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize | |
| callback.chunked = chunked; | ||
| callback.originalProducerName = null; | ||
| callback.originalSequenceId = -1L; | ||
| callback.highestSequenceId = -1L; | ||
| callback.originalHighestSequenceId = -1L; | ||
| callback.startTimeNs = startTimeNs; | ||
| callback.isMarker = isMarker; | ||
| callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; | ||
| callback.ledgerId = position == null ? -1 : position.getLedgerId(); | ||
| callback.entryId = position == null ? -1 : position.getEntryId(); | ||
| callback.messageMetadata = null; | ||
| if (callback.propertyMap != null) { | ||
| callback.propertyMap.clear(); | ||
| } | ||
|
|
@@ -651,12 +653,14 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long | |
| callback.batchSize = batchSize; | ||
| callback.originalProducerName = null; | ||
| callback.originalSequenceId = -1L; | ||
| callback.originalHighestSequenceId = -1L; | ||
| callback.startTimeNs = startTimeNs; | ||
| callback.chunked = chunked; | ||
| callback.isMarker = isMarker; | ||
| callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; | ||
| callback.ledgerId = position == null ? -1 : position.getLedgerId(); | ||
| callback.entryId = position == null ? -1 : position.getEntryId(); | ||
| callback.messageMetadata = null; | ||
| if (callback.propertyMap != null) { | ||
| callback.propertyMap.clear(); | ||
| } | ||
|
|
@@ -703,6 +707,7 @@ public void recycle() { | |
| startTimeNs = -1L; | ||
| chunked = false; | ||
| isMarker = false; | ||
| messageMetadata = null; | ||
| if (propertyMap != null) { | ||
| propertyMap.clear(); | ||
| } | ||
|
|
@@ -868,16 +873,16 @@ public void checkEncryption() { | |
|
|
||
| public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, | ||
| ByteBuf headersAndPayload, int batchSize, boolean isChunked, boolean isMarker) { | ||
| if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) { | ||
| return; | ||
| } | ||
| MessagePublishContext messagePublishContext = | ||
| MessagePublishContext.get(this, sequenceId, highSequenceId, | ||
| headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null, | ||
| cnx.isClientSupportsReplDedupByLidAndEid()); | ||
| if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, null, messagePublishContext)) { | ||
| messagePublishContext.recycle(); | ||
| return; | ||
| } | ||
| if (brokerInterceptor != null) { | ||
| brokerInterceptor | ||
| .onMessagePublish(this, headersAndPayload, messagePublishContext); | ||
| brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); | ||
| } | ||
| topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A related lifecycle optimization: after topic.publishMessage(...) / topic.publishTxnMessage(...) returns, the cached MessageMetadata appears no longer needed by the current publish flow. PersistentTopic consumes it synchronously for max-delivery-delay validation and deduplication before asyncAddEntry/transaction append. The later async completion path and messageProduced interceptor don't receive headersAndPayload, so keeping the parsed MessageMetadata attached to MessagePublishContext until recycle extends its lifetime to the full in-flight write duration.
If we add clearMessageMetadata() for the interceptor correctness issue above, it would be useful to also clear it after the topic publish call returns, preferably in a finally block, so high-throughput or high-latency writes don't retain parsed metadata objects longer than necessary.