Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,23 @@ public static byte[] valueToBytes(TxnTransitMetadata txnMetadata,
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
}

return MessageUtil.toVersionPrefixedBytes(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably also still inline.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — inlined back into the return statement.

transactionVersionLevel.transactionLogValueVersion(),
new TransactionLogValue()
short logValueVersion = transactionVersionLevel.transactionLogValueVersion();
TransactionLogValue value = new TransactionLogValue()
.setProducerId(txnMetadata.producerId())
.setProducerEpoch(txnMetadata.producerEpoch())
.setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
.setTransactionStatus(txnMetadata.txnState().id())
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
.setTransactionPartitions(transactionPartitions)
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
);
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel());

if (logValueVersion >= 1) {
value.setPreviousProducerId(txnMetadata.prevProducerId());
value.setNextProducerId(txnMetadata.nextProducerId());
}

return MessageUtil.toVersionPrefixedBytes(logValueVersion, value);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,62 @@ var record = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(
}
}

@Test
void shouldRoundTripPreviousAndNextProducerIds() {
var txnTransitMetadata = new TxnTransitMetadata(
200L, // producerId
100L, // prevProducerId
201L, // nextProducerId
(short) 5, // producerEpoch
(short) 4, // lastProducerEpoch
1000, // txnTimeoutMs
TransactionState.PREPARE_COMMIT,
new HashSet<>(Set.of(new TopicPartition("topic", 0))),
0L, // txnStartTimestamp
0L, // txnLastUpdateTimestamp
TV_2
);

// Serialize to bytes and deserialize the raw TransactionLogValue
var bytes = TransactionLog.valueToBytes(txnTransitMetadata, TV_2);
var buffer = wrap(bytes);
buffer.getShort(); // skip version prefix
var value = new TransactionLogValue(new ByteBufferAccessor(buffer), (short) 1);

assertEquals(200L, value.producerId());
assertEquals(100L, value.previousProducerId());
assertEquals(201L, value.nextProducerId());
}

@Test
void shouldNotPersistProducerIdsAtVersion0() {
// Version 0 is non-flexible, so tagged fields (previousProducerId,
// nextProducerId) cannot be written. They fall back to defaults (-1).
var txnTransitMetadata = new TxnTransitMetadata(
200L, // producerId
100L, // prevProducerId — not persisted at v0
201L, // nextProducerId — not persisted at v0
(short) 5, // producerEpoch
(short) 4, // lastProducerEpoch
1000, // txnTimeoutMs
TransactionState.PREPARE_COMMIT,
new HashSet<>(Set.of(new TopicPartition("topic", 0))),
0L, // txnStartTimestamp
0L, // txnLastUpdateTimestamp
TV_0
);

var record = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(
TransactionLog.keyToBytes("transactionalId"),
TransactionLog.valueToBytes(txnTransitMetadata, TV_0)
)).records().iterator().next();
var readResult = assertInstanceOf(TransactionLog.TxnRecord.class, TransactionLog.read(record.key(), record.value()));
var deserialized = readResult.metadata();

assertEquals(200L, deserialized.producerId());
assertEquals(-1L, deserialized.prevProducerId());
}

@Test
void testSerializeTransactionLogValueToHighestNonFlexibleVersion() {
var txnTransitMetadata = new TxnTransitMetadata(1L, 1L, 1L, (short) 1, (short) 1, 1000, TransactionState.COMPLETE_COMMIT, new HashSet<>(), 500L, 500L, TV_0);
Expand Down
Loading