Conversation
| */ | ||
| static byte[] rawAggregation(final byte[] aggregationWithHeaders) { | ||
| if (aggregationWithHeaders == null) { | ||
| static byte[] rawAggregation(final byte[] valueWithHeaders) { |
There was a problem hiding this comment.
KAFKA-20249 moved rawAggregation() and headers() to Utils.java with performance improvements, please reuse them
| @Override | ||
| public void put(final Windowed<Bytes> sessionKey, final byte[] aggregateWithHeader) { | ||
| store.put(sessionKey, AggregationWithHeadersDeserializer.rawAggregation(aggregateWithHeader)); | ||
| store.put(sessionKey, ValueWithHeadersDeserializer.rawAggregation(aggregateWithHeader)); |
| */ | ||
| static byte[] rawAggregation(final byte[] aggregationWithHeaders) { | ||
| if (aggregationWithHeaders == null) { | ||
| static byte[] rawAggregation(final byte[] valueWithHeaders) { |
There was a problem hiding this comment.
Nit: The class was renamed from AggregationWithHeaders* to ValueWithHeaders*, but the static method is still called rawAggregation(). Should this be rawValue() for consistency?
Same for the AggregationWithHeaders wrapper class itself, could be renamed to ValueWithHeaders.
| } | ||
|
|
||
| private <AGG> Serde<AggregationWithHeaders<AGG>> createAggregationWithHeadersSerde(final Serde<AGG> aggSerde) { | ||
| private <AGG> Serde<AggregationWithHeaders<AGG>> createValueWithHeadersSerde(final Serde<AGG> aggSerde) { |
| private <AGG> Serde<AggregationWithHeaders<AGG>> createValueWithHeadersSerde(final Serde<AGG> aggSerde) { | ||
| return new Serde<>() { | ||
| @Override | ||
| public Serializer<AggregationWithHeaders<AGG>> serializer() { |
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @frankvicky. LGTM with minor fixes needed. Also it's nice to have a PR description.
| public AGG aggregation() { | ||
| return aggregation; | ||
| public V aggregation() { | ||
| return value; |
There was a problem hiding this comment.
The internal field is renamed to value, but the public getter is still aggregation(). It may result into confusion.
| * otherwise {@code null} is returned | ||
| */ | ||
| public static <AGG> AggregationWithHeaders<AGG> make(final AGG aggregation, final Headers headers) { | ||
| public static <AGG> ValueWithHeaders<AGG> make(final AGG aggregation, final Headers headers) { |
There was a problem hiding this comment.
To be consistent cold we change the var name from aggregation to value as well? Also updating the javadocs correspondingly.
| return "AggregationWithHeaders{" + | ||
| "aggregation=" + aggregation + | ||
| return "ValueWithHeaders{" + | ||
| "aggregation=" + value + |
There was a problem hiding this comment.
| "aggregation=" + value + | |
| "value=" + value + |
|
@frankvicky please fix conflicts |
mjsax
left a comment
There was a problem hiding this comment.
Seems my original idea of renaming might have been a bad one... Seeing the large splash radius of it on this PR, makes we wonder if we should just keep the names as-is? (Sorry about this back and forth...)
| return false; | ||
| } | ||
| final ReadOnlySessionStore<String, AggregationWithHeaders<String>> store = processorRef.get().store(); | ||
| final ReadOnlySessionStore<String, ValueWithHeaders<String>> store = processorRef.get().store(); |
There was a problem hiding this comment.
AggregationWithHeaders is a public class added via KIP-1271 -- do we really want to rename it?
My original comment was about AggregationWithHeadersSerializer/AggregationWithHeadersDeserializer/AggregationWithHeadersSerde (I know that I did say "related classes"; sorry for the confusion)
Would it be too odd to only rename the serde classes? Or would we want to update the KIP? Or disregard my original comment (might not have been a good suggestion after all...)
There was a problem hiding this comment.
I think it would be confusing if we only rename the serde classes 🤔
I am open to other options
| @Override | ||
| public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> fetch(final K keyFrom, | ||
| public KeyValueIterator<Windowed<K>, ValueWithHeaders<AGG>> fetch(final K keyFrom, | ||
| final K keyTo) { |
There was a problem hiding this comment.
@aliehsaeedii -- that's why the current formatting is bad...
We would need to fix the indention here, too. That's why I prefer to either make a method definition a single line:
public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> fetch(final K keyFrom, final K keyTo) {
or, it the line becomes too long, move every parameter into it's own line:
public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> fetch(
final K keyFrom,
final K keyTo
) {
to avoid all this indention non-sense.
ref:
#21780 (review)