-
Notifications
You must be signed in to change notification settings - Fork 15.1k
MINOR: simplify window-byte-store-supplier #21829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -299,7 +299,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, | |
| final Duration retentionPeriod, | ||
| final Duration windowSize, | ||
| final boolean retainDuplicates) throws IllegalArgumentException { | ||
| return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false); | ||
| return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -331,7 +331,7 @@ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final St | |
| final Duration retentionPeriod, | ||
| final Duration windowSize, | ||
| final boolean retainDuplicates) throws IllegalArgumentException { | ||
| return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true); | ||
| return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -348,28 +348,14 @@ public static WindowBytesStoreSupplier persistentTimestampedWindowStoreWithHeade | |
| final Duration retentionPeriod, | ||
| final Duration windowSize, | ||
| final boolean retainDuplicates) throws IllegalArgumentException { | ||
| Objects.requireNonNull(name, "name cannot be null"); | ||
| final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); | ||
| final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); | ||
| final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); | ||
| final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); | ||
|
|
||
| final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L); | ||
|
|
||
| return new RocksDbWindowBytesStoreSupplier( | ||
| name, | ||
| retentionMs, | ||
| defaultSegmentInterval, | ||
| windowSizeMs, | ||
| retainDuplicates, | ||
| RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS); | ||
| return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS); | ||
| } | ||
|
|
||
| private static WindowBytesStoreSupplier persistentWindowStore(final String name, | ||
| final Duration retentionPeriod, | ||
| final Duration windowSize, | ||
| final boolean retainDuplicates, | ||
| final boolean timestampedStore) { | ||
| final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) { | ||
| Objects.requireNonNull(name, "name cannot be null"); | ||
| final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); | ||
| final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); | ||
|
|
@@ -378,38 +364,26 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name, | |
|
|
||
| final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L); | ||
|
|
||
| return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore); | ||
| } | ||
|
|
||
| private static WindowBytesStoreSupplier persistentWindowStore(final String name, | ||
| final long retentionPeriod, | ||
| final long windowSize, | ||
| final boolean retainDuplicates, | ||
| final long segmentInterval, | ||
| final boolean timestampedStore) { | ||
| Objects.requireNonNull(name, "name cannot be null"); | ||
| if (retentionPeriod < 0L) { | ||
| if (retentionMs < 0L) { | ||
| throw new IllegalArgumentException("retentionPeriod cannot be negative"); | ||
| } | ||
| if (windowSize < 0L) { | ||
| if (windowSizeMs < 0L) { | ||
| throw new IllegalArgumentException("windowSize cannot be negative"); | ||
| } | ||
| if (segmentInterval < 1L) { | ||
| throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); | ||
| } | ||
| if (windowSize > retentionPeriod) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ups. Deleted too much -- only wanted to delete the |
||
| if (windowSizeMs > retentionMs) { | ||
| throw new IllegalArgumentException("The retention period of the window store " | ||
| + name + " must be no smaller than its window size. Got size=[" | ||
| + windowSize + "], retention=[" + retentionPeriod + "]"); | ||
| } | ||
|
|
||
| return new RocksDbWindowBytesStoreSupplier( | ||
| name, | ||
| retentionPeriod, | ||
| segmentInterval, | ||
| windowSize, | ||
| retentionMs, | ||
| defaultSegmentInterval, | ||
| windowSizeMs, | ||
| retainDuplicates, | ||
| timestampedStore); | ||
| storeType | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,12 +40,14 @@ public enum WindowStoreTypes { | |
| private final boolean retainDuplicates; | ||
| private final WindowStoreTypes windowStoreType; | ||
|
|
||
| public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(final String name, | ||
| final Duration retentionPeriod, | ||
| final Duration windowSize, | ||
| final boolean retainDuplicates, | ||
| final boolean hasIndex, | ||
| final boolean withHeaders) { | ||
| public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create( | ||
| final String name, | ||
| final Duration retentionPeriod, | ||
| final Duration windowSize, | ||
| final boolean retainDuplicates, | ||
| final boolean hasIndex, | ||
| final boolean withHeaders | ||
| ) { | ||
| Objects.requireNonNull(name, "name cannot be null"); | ||
| final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); | ||
| final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); | ||
|
|
@@ -60,28 +62,41 @@ public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(final Str | |
| if (windowSizeMs < 0L) { | ||
| throw new IllegalArgumentException("windowSize cannot be negative"); | ||
| } | ||
| if (defaultSegmentInterval < 1L) { | ||
| throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check can never execute as we compute |
||
| } | ||
| if (windowSizeMs > retentionMs) { | ||
| throw new IllegalArgumentException("The retention period of the window store " | ||
| + name + " must be no smaller than its window size. Got size=[" | ||
| + windowSizeMs + "], retention=[" + retentionMs + "]"); | ||
| } | ||
|
|
||
| return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(name, retentionMs, | ||
| defaultSegmentInterval, windowSizeMs, retainDuplicates, hasIndex, withHeaders); | ||
| return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier( | ||
| name, | ||
| retentionMs, | ||
| defaultSegmentInterval, | ||
| windowSizeMs, | ||
| retainDuplicates, | ||
| hasIndex, | ||
| withHeaders | ||
| ); | ||
| } | ||
|
|
||
| public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name, | ||
| final long retentionPeriod, | ||
| final long segmentInterval, | ||
| final long windowSize, | ||
| final boolean retainDuplicates, | ||
| final boolean withIndex, | ||
| final boolean withHeaders) { | ||
| this(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates, | ||
| determineStoreType(withIndex, withHeaders)); | ||
| // for testing only | ||
| RocksDbIndexedTimeOrderedWindowBytesStoreSupplier( | ||
| final String name, | ||
| final long retentionPeriod, | ||
| final long segmentInterval, | ||
| final long windowSize, | ||
| final boolean retainDuplicates, | ||
| final boolean withIndex, | ||
| final boolean withHeaders | ||
| ) { | ||
| this( | ||
| name, | ||
| retentionPeriod, | ||
| segmentInterval, | ||
| windowSize, | ||
| retainDuplicates, | ||
| determineStoreType(withIndex, withHeaders) | ||
| ); | ||
| } | ||
|
|
||
| private static WindowStoreTypes determineStoreType(final boolean withIndex, final boolean withHeaders) { | ||
|
|
@@ -94,12 +109,14 @@ private static WindowStoreTypes determineStoreType(final boolean withIndex, fina | |
| } | ||
| } | ||
|
|
||
| public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name, | ||
| final long retentionPeriod, | ||
| final long segmentInterval, | ||
| final long windowSize, | ||
| final boolean retainDuplicates, | ||
| final WindowStoreTypes windowStoreType) { | ||
| private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier( | ||
| final String name, | ||
| final long retentionPeriod, | ||
| final long segmentInterval, | ||
| final long windowSize, | ||
| final boolean retainDuplicates, | ||
| final WindowStoreTypes windowStoreType | ||
| ) { | ||
| this.name = name; | ||
| this.retentionPeriod = retentionPeriod; | ||
| this.segmentInterval = segmentInterval; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We compute
segmentIntervalourselves -- cannot be smaller than1L.