Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,13 @@ public boolean isSuccessorTo(Producer other) {

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize,
boolean isChunked, boolean isMarker, Position position) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position);
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, position, messagePublishContext)) {
publishMessageToTopic(headersAndPayload, messagePublishContext);
} else {
messagePublishContext.recycle();
Comment on lines +214 to +217
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A related lifecycle optimization: after topic.publishMessage(...) / topic.publishTxnMessage(...) returns, the cached MessageMetadata appears no longer needed by the current publish flow. PersistentTopic consumes it synchronously for max-delivery-delay validation and deduplication before asyncAddEntry/transaction append. The later async completion path and messageProduced interceptor don't receive headersAndPayload, so keeping the parsed MessageMetadata attached to MessagePublishContext until recycle extends its lifetime to the full in-flight write duration.

If we add clearMessageMetadata() for the interceptor correctness issue above, it would be useful to also clear it after the topic publish call returns, preferably in a finally block, so high-throughput or high-latency writes don't retain parsed metadata objects longer than necessary.

}
}

Expand All @@ -223,14 +228,18 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
});
return;
}
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
isMarker, position);
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, position, messagePublishContext)) {
publishMessageToTopic(headersAndPayload, messagePublishContext);
} else {
messagePublishContext.recycle();
}
}

public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize,
Position position) {
public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload,
Position position, PublishContext publishContext) {
Comment thread
dao-jun marked this conversation as resolved.
if (!isShadowTopic && position != null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
Expand Down Expand Up @@ -266,10 +275,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
}

if (topic.isEncryptionRequired()) {

headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload);
int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
// Check whether the message is encrypted or not
if (encryptionKeysCount < 1) {
Expand All @@ -283,7 +289,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
}
}

startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
startPublishOperation((int) publishContext.getNumberOfMessages(), headersAndPayload.readableBytes());
return true;
}

Expand All @@ -292,26 +298,9 @@ private boolean isSupportsReplDedupByLidAndEid() {
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
int batchSize, boolean isChunked, boolean isMarker, Position position) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
private void publishMessageToTopic(ByteBuf headersAndPayload, MessagePublishContext messagePublishContext) {
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR body says the cached metadata is invalidated after BrokerInterceptor.onMessagePublish(...), but I don't see that in the final diff.

This changes behavior when metadata has already been cached before the interceptor runs. For example, on an encryption-required topic, checkAndStartPublish(...) calls publishContext.getMessageMetadata(...). If an interceptor then mutates the message metadata in headersAndPayload, PersistentTopic.isExceedMaximumDeliveryDelay(...) and MessageDeduplication will reuse the pre-interceptor cached metadata. In current master those later checks reparse headersAndPayload after the interceptor, so they observe the post-interceptor metadata.

Could we add a MessagePublishContext#clearMessageMetadata()/invalidateMessageMetadata() and call it immediately after BrokerInterceptor.onMessagePublish(...) in both the normal publish path and publishTxnMessage?


If multiple interceptors are executed in series and the first interceptor modifies metadata, and the second interceptor reads metadata through publishContext.getMessageMetadata(...), then clearing the cache only after the entire BrokerInterceptors returns cannot solve the visibility within the interceptor chain?

}
topic.publishMessage(headersAndPayload, messagePublishContext);
}
Expand Down Expand Up @@ -412,6 +401,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
private long originalHighestSequenceId;

private long entryTimestamp;
private MessageMetadata messageMetadata;

@Override
public long getLedgerId() {
Expand Down Expand Up @@ -467,6 +457,15 @@ public long getHighestSequenceId() {
return highestSequenceId;
}

@Override
public MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) {
if (messageMetadata == null) {
messageMetadata = new MessageMetadata();
Commands.peekMessageMetadata(headersAndPayload, messageMetadata);
}
return messageMetadata;
}

@Override
public void setOriginalProducerName(String originalProducerName) {
this.originalProducerName = originalProducerName;
Expand Down Expand Up @@ -629,11 +628,14 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.highestSequenceId = -1L;
callback.originalHighestSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
callback.messageMetadata = null;
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
Expand All @@ -651,12 +653,14 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.originalHighestSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
callback.messageMetadata = null;
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
Expand Down Expand Up @@ -703,6 +707,7 @@ public void recycle() {
startTimeNs = -1L;
chunked = false;
isMarker = false;
messageMetadata = null;
if (propertyMap != null) {
propertyMap.clear();
}
Expand Down Expand Up @@ -868,16 +873,16 @@ public void checkEncryption() {

public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, int batchSize, boolean isChunked, boolean isMarker) {
if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) {
return;
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
cnx.isClientSupportsReplDedupByLidAndEid());
if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, null, messagePublishContext)) {
messagePublishContext.recycle();
return;
}
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,19 +2193,20 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
}

private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
log.debug()
.attr("producerId", send.getProducerId())
.attr("sendSequenceId", send.getSequenceId())
.attr("producerName", msgMetadata.getProducerName())
.attr("metadataSequenceId", msgMetadata.getSequenceId())
.attr("readableBytes", headersAndPayload.readableBytes())
.attr("partitionKey", msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null)
.attr("orderingKey", msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null)
.attr("uncompressedSize", msgMetadata.getUncompressedSize())
.log("Received send message request");
log.debug(event -> {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
event.attr("producerId", send.getProducerId())
.attr("sendSequenceId", send.getSequenceId())
.attr("producerName", msgMetadata.getProducerName())
.attr("metadataSequenceId", msgMetadata.getSequenceId())
.attr("readableBytes", headersAndPayload.readableBytes())
.attr("partitionKey", msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null)
.attr("orderingKey", msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null)
.attr("uncompressedSize", msgMetadata.getUncompressedSize())
.log("Received send message request");
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -85,6 +87,12 @@ default long getOriginalSequenceId() {
default void setMetadataFromEntryData(ByteBuf entryData) {
}

default MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) {
MessageMetadata messageMetadata = new MessageMetadata();
Commands.peekMessageMetadata(headersAndPayload, messageMetadata);
return messageMetadata;
}

default long getHighestSequenceId() {
return -1L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
public MessageDupStatus isDuplicateReplV1(PublishContext publishContext, ByteBuf headersAndPayload) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload);

String producerName = md.getProducerName();
long sequenceId = md.getSequenceId();
Expand All @@ -333,9 +331,7 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) {
// Message is coming from replication, we need to use the replication's producer name, source cluster's
// ledger id and entry id for the purpose of deduplication.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload);

List<KeyValue> kvPairList = md.getPropertiesList();
for (KeyValue kvPair : kvPairList) {
Expand Down Expand Up @@ -448,9 +444,7 @@ public MessageDupStatus isDuplicateNormal(PublishContext publishContext, ByteBuf
long chunkID = -1;
long totalChunk = -1;
if (publishContext.isChunked()) {
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload);
chunkID = md.getChunkId();
totalChunk = md.getNumChunksFromMsg();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
Expand Down Expand Up @@ -4737,7 +4737,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
Expand Down Expand Up @@ -4978,13 +4978,11 @@ public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}

protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload, PublishContext publishContext) {
Comment thread
dao-jun marked this conversation as resolved.
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload);
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
Expand Down
Loading