Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 12 additions & 38 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false);
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE);
}

/**
Expand Down Expand Up @@ -331,7 +331,7 @@ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final St
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true);
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE);
}

/**
Expand All @@ -348,28 +348,14 @@ public static WindowBytesStoreSupplier persistentTimestampedWindowStoreWithHeade
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);

final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);

return new RocksDbWindowBytesStoreSupplier(
name,
retentionMs,
defaultSegmentInterval,
windowSizeMs,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
}

private static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final boolean timestampedStore) {
final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
Expand All @@ -378,38 +364,26 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name,

final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);

return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore);
}

private static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final long segmentInterval,
final boolean timestampedStore) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0L) {
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSize < 0L) {
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (segmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We compute segmentInterval ourselves -- cannot be smaller than 1L.

}
if (windowSize > retentionPeriod) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

windowSize > retentionPeriod validation was dropped.
Note that RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create() still keeps this exact validation (line 66-69), so it seems like an unintentional omission.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups. Deleted too much -- only wanted to delete the segmentInterval check.

if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
}

return new RocksDbWindowBytesStoreSupplier(
name,
retentionPeriod,
segmentInterval,
windowSize,
retentionMs,
defaultSegmentInterval,
windowSizeMs,
retainDuplicates,
timestampedStore);
storeType
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ public enum WindowStoreTypes {
private final boolean retainDuplicates;
private final WindowStoreTypes windowStoreType;

public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final boolean hasIndex,
final boolean withHeaders) {
public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(
final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final boolean hasIndex,
final boolean withHeaders
) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
Expand All @@ -60,28 +62,41 @@ public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(final Str
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (defaultSegmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can never execute as we compute defaultSegmentInterval above ourselves.

}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSizeMs + "], retention=[" + retentionMs + "]");
}

return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(name, retentionMs,
defaultSegmentInterval, windowSizeMs, retainDuplicates, hasIndex, withHeaders);
return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
name,
retentionMs,
defaultSegmentInterval,
windowSizeMs,
retainDuplicates,
hasIndex,
withHeaders
);
}

public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final boolean withIndex,
final boolean withHeaders) {
this(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates,
determineStoreType(withIndex, withHeaders));
// for testing only
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final boolean withIndex,
final boolean withHeaders
) {
this(
name,
retentionPeriod,
segmentInterval,
windowSize,
retainDuplicates,
determineStoreType(withIndex, withHeaders)
);
}

private static WindowStoreTypes determineStoreType(final boolean withIndex, final boolean withHeaders) {
Expand All @@ -94,12 +109,14 @@ private static WindowStoreTypes determineStoreType(final boolean withIndex, fina
}
}

public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final WindowStoreTypes windowStoreType) {
private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final WindowStoreTypes windowStoreType
) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.segmentInterval = segmentInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,14 @@ public enum WindowStoreTypes {
private final boolean retainDuplicates;
private final WindowStoreTypes windowStoreType;

public RocksDbWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final boolean returnTimestampedStore) {
this(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates,
returnTimestampedStore
? WindowStoreTypes.TIMESTAMPED_WINDOW_STORE
: WindowStoreTypes.DEFAULT_WINDOW_STORE);
}

public RocksDbWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final WindowStoreTypes windowStoreType) {
public RocksDbWindowBytesStoreSupplier(
final String name,
final long retentionPeriod,
final long segmentInterval,
final long windowSize,
final boolean retainDuplicates,
final WindowStoreTypes windowStoreType
) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.segmentInterval = segmentInterval;
Expand Down
Loading