Skip to content

MINOR: follow-up of KAFKA-20317#21838

Open
frankvicky wants to merge 5 commits intoapache:trunkfrom
frankvicky:20317-followup
Open

MINOR: follow-up of KAFKA-20317#21838
frankvicky wants to merge 5 commits intoapache:trunkfrom
frankvicky:20317-followup

Conversation

@frankvicky
Copy link
Copy Markdown
Contributor

@frankvicky frankvicky commented Mar 20, 2026

@github-actions github-actions Bot added triage PRs from the community streams and removed triage PRs from the community labels Mar 20, 2026
Copy link
Copy Markdown
Collaborator

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch.

*/
static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
if (aggregationWithHeaders == null) {
static byte[] rawAggregation(final byte[] valueWithHeaders) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA-20249 moved rawAggregation() and headers() to Utils.java with performance improvements, please reuse them

@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregateWithHeader) {
store.put(sessionKey, AggregationWithHeadersDeserializer.rawAggregation(aggregateWithHeader));
store.put(sessionKey, ValueWithHeadersDeserializer.rawAggregation(aggregateWithHeader));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

*/
static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
if (aggregationWithHeaders == null) {
static byte[] rawAggregation(final byte[] valueWithHeaders) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The class was renamed from AggregationWithHeaders* to ValueWithHeaders*, but the static method is still called rawAggregation(). Should this be rawValue() for consistency?
Same for the AggregationWithHeaders wrapper class itself, could be renamed to ValueWithHeaders.

}

private <AGG> Serde<AggregationWithHeaders<AGG>> createAggregationWithHeadersSerde(final Serde<AGG> aggSerde) {
private <AGG> Serde<AggregationWithHeaders<AGG>> createValueWithHeadersSerde(final Serde<AGG> aggSerde) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AGG -> V

private <AGG> Serde<AggregationWithHeaders<AGG>> createValueWithHeadersSerde(final Serde<AGG> aggSerde) {
return new Serde<>() {
@Override
public Serializer<AggregationWithHeaders<AGG>> serializer() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Copy Markdown
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @frankvicky. LGTM with minor fixes needed. Also it's nice to have a PR description.

public AGG aggregation() {
return aggregation;
public V aggregation() {
return value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal field is renamed to value, but the public getter is still aggregation(). It may result into confusion.

* otherwise {@code null} is returned
*/
public static <AGG> AggregationWithHeaders<AGG> make(final AGG aggregation, final Headers headers) {
public static <AGG> ValueWithHeaders<AGG> make(final AGG aggregation, final Headers headers) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent cold we change the var name from aggregation to value as well? Also updating the javadocs correspondingly.

return "AggregationWithHeaders{" +
"aggregation=" + aggregation +
return "ValueWithHeaders{" +
"aggregation=" + value +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"aggregation=" + value +
"value=" + value +

@mjsax mjsax added the kip Requires or implements a KIP label Mar 27, 2026
@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Apr 2, 2026

@frankvicky please fix conflicts

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems my original idea of renaming might have been a bad one... Seeing the large splash radius of it on this PR, makes we wonder if we should just keep the names as-is? (Sorry about this back and forth...)

return false;
}
final ReadOnlySessionStore<String, AggregationWithHeaders<String>> store = processorRef.get().store();
final ReadOnlySessionStore<String, ValueWithHeaders<String>> store = processorRef.get().store();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregationWithHeaders is a public class added via KIP-1271 -- do we really want to rename it?

My original comment was about AggregationWithHeadersSerializer/AggregationWithHeadersDeserializer/AggregationWithHeadersSerde (I know that I did say "related classes"; sorry for the confusion)

Would it be too odd to only rename the serde classes? Or would we want to update the KIP? Or disregard my original comment (might not have been a good suggestion after all...)

\cc @aliehsaeedii @bbejeck

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be confusing if we only rename the serde classes 🤔
I am open to other options

@Override
public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> fetch(final K keyFrom,
public KeyValueIterator<Windowed<K>, ValueWithHeaders<AGG>> fetch(final K keyFrom,
final K keyTo) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aliehsaeedii -- that's why the current formatting is bad...

We would need to fix the indention here, too. That's why I prefer to either make a method definition a single line:

public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> fetch(final K keyFrom, final K keyTo) {

or, it the line becomes too long, move every parameter into it's own line:

public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> fetch(
    final K keyFrom,
    final K keyTo
) {

to avoid all this indention non-sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants