KAFKA-20224: Refactor Streams iterator adapters to reduce duplication#21830
KAFKA-20224: Refactor Streams iterator adapters to reduce duplication#21830nileshkumar3 wants to merge 3 commits intoapache:trunkfrom
Conversation
|
|
||
| class PlainToHeadersIteratorAdapter<K> implements KeyValueIterator<K, byte[]> { | ||
| private final KeyValueIterator<K, byte[]> innerIterator; | ||
| class PlainToHeadersIteratorAdapter<K> extends MappingKeyValueIteratorAdapter<K> { |
There was a problem hiding this comment.
Do we even need to keep these sub-classes? Or could the calling code just do newMappingKeyValueIteratorAdapter(...) instead?
There was a problem hiding this comment.
Good point. These subclasses no longer add any behavior beyond selecting the mapping function, so they could be removed and the call could instantiate MappingKeyValueIteratorAdapter directly. I kept them earlier here mainly to limit churn, but I cleaned this.
…ueIteratorAdapter factories Remove PlainToHeadersIteratorAdapter, TimestampedToHeadersIteratorAdapter, and SessionToHeadersIteratorAdapter. Call sites use plainToHeaders, timestampedToHeaders, and sessionToHeaders on MappingKeyValueIteratorAdapter. Window store iterator adapters extend MappingKeyValueIteratorAdapter directly. Update tests. Made-with: Cursor
frankvicky
left a comment
There was a problem hiding this comment.
Thanks for the PR
only some nits.
| } | ||
|
|
||
| /** | ||
| * Ensures backward compatibility between {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders} |
There was a problem hiding this comment.
nit: replace full qualified with import
|
|
||
| /** | ||
| * Ensures backward compatibility between {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders} | ||
| * and plain {@link org.apache.kafka.streams.state.KeyValueStore}: values are wrapped with empty headers |
There was a problem hiding this comment.
nit: replace full qualified with import
|
|
||
| /** | ||
| * Ensures backward compatibility between {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders} | ||
| * and {@link org.apache.kafka.streams.state.TimestampedKeyValueStore}. |
There was a problem hiding this comment.
nit: replace full qualified with import
| * Ensures backward compatibility between {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} | ||
| * and {@link org.apache.kafka.streams.state.SessionStore}. |
There was a problem hiding this comment.
nit: replace full qualified with import
…gKeyValueIteratorAdapter.java
Thanks for reviewing, replaced with import. |
|
@mjsax @frankvicky thanks for the review, fixed all the comments. Let me know if anything else need to be changed. |
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @nileshkumar3 for the PR. LGTM.
Thanks @aliehsaeedii for reviewing. |
|
@mjsax just a gentle follow-up on this PR. I’ve addressed the previous feedback and cleaned up the implementation (including removing the subclasses). Would appreciate a review when you have time. Thanks! |
|
@mjsax No urgency — when it fits your queue, could you take another look at the PR ,I’ve incorporated the earlier review notes. Thank you. |
We had three almost-identical iterators that only wrapped the inner
iterator and ran a different HeadersBytesStore conversion on the value
bytes. I pulled that into a small MappingKeyValueIteratorAdapter that
takes a Function<byte[], byte[]> and left the three existing adapter
classes as thin subclasses so names and call sites stay the same.
Behavior should be unchanged; the usual ToHeaders iterator/store/window
tests still pass.