-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Dataflow Streaming] Add a job setting to limit value size in windmill state cache #38458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2ec6939
59a7c3d
af26fb1
e551790
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,9 +75,18 @@ public class WindmillStateCache implements StatusDataProvider { | |
| private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex; | ||
| private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. | ||
| private final boolean supportMapViaMultimap; | ||
|
|
||
| WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) { | ||
| private final long defaultMaxCachedValueBytes; | ||
| private final boolean enableHistogram; | ||
| private volatile long maxCachedValueBytesOverride = -1L; | ||
|
|
||
| WindmillStateCache( | ||
| long sizeMb, | ||
| boolean supportMapViaMultimap, | ||
| long maxCachedValueBytes, | ||
| boolean enableHistogram) { | ||
| this.workerCacheBytes = sizeMb * MEGABYTES; | ||
| this.defaultMaxCachedValueBytes = maxCachedValueBytes; | ||
| this.enableHistogram = enableHistogram; | ||
| int stateCacheConcurrencyLevel = | ||
| Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); | ||
| this.stateCache = | ||
|
|
@@ -99,22 +108,48 @@ public interface Builder { | |
|
|
||
| Builder setSupportMapViaMultimap(boolean supportMapViaMultimap); | ||
|
|
||
| Builder setMaxCachedValueBytes(long maxCachedValueBytes); | ||
|
|
||
| Builder setEnableHistogram(boolean enableHistogram); | ||
|
|
||
| WindmillStateCache build(); | ||
| } | ||
|
|
||
| public static Builder builder() { | ||
| return new AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false); | ||
| return new AutoBuilder_WindmillStateCache_Builder() | ||
| .setSupportMapViaMultimap(false) | ||
| .setMaxCachedValueBytes(Long.MAX_VALUE) | ||
| .setEnableHistogram(true); | ||
| } | ||
|
|
||
| public void setMaxCachedValueBytesOverride(long limit) { | ||
| this.maxCachedValueBytesOverride = limit; | ||
| } | ||
|
|
||
| private long getMaxCachedValueBytesLimit() { | ||
| long override = maxCachedValueBytesOverride; | ||
| return override >= 0 ? override : defaultMaxCachedValueBytes; | ||
| } | ||
|
|
||
| private EntryStats calculateEntryStats() { | ||
| EntryStats stats = new EntryStats(); | ||
| BiConsumer<StateId, StateCacheEntry> consumer = | ||
| (stateId, stateCacheEntry) -> { | ||
| stats.entries++; | ||
| stats.idWeight += stateId.getWeight(); | ||
| stats.entryWeight += stateCacheEntry.getWeight(); | ||
| long idWeight = stateId.getWeight(); | ||
| stats.idWeight += idWeight; | ||
| long entryWeight = stateCacheEntry.getWeight(); | ||
| stats.entryWeight += entryWeight; | ||
| stats.entryValues += stateCacheEntry.values.size(); | ||
| stats.maxEntryValues = Math.max(stats.maxEntryValues, stateCacheEntry.values.size()); | ||
| if (enableHistogram) { | ||
| stats.addKeyWeight(idWeight); | ||
| stats.addEntryWeight(entryWeight); | ||
| stateCacheEntry.values.forEach( | ||
| (encodedAddress, weightedValue) -> { | ||
|
Comment on lines
+146
to
+149
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is executed once every (10m) debug capture report. I think computing the histograms here is better than the overhead for keeping it in sync during the processing. Added an experiment to disable the histogram stats if needed. Any other options? |
||
| stats.addValueWeight(weightedValue.weight); | ||
| }); | ||
|
Comment on lines
+148
to
+151
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iterating over all values in every cache entry to calculate the weight distribution increases the complexity of |
||
| } | ||
| }; | ||
| stateCache.asMap().forEach(consumer); | ||
| return stats; | ||
|
|
@@ -138,27 +173,60 @@ public ForComputation forComputation(String computation) { | |
| return new ForComputation(computation); | ||
| } | ||
|
|
||
| private static String formatHistogram(long[] histogram) { | ||
| return String.format( | ||
| "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", | ||
| histogram[0], | ||
| histogram[1], | ||
| histogram[2], | ||
| histogram[3], | ||
| histogram[4], | ||
| histogram[5], | ||
| histogram[6]); | ||
| } | ||
|
|
||
| /** Print summary statistics of the cache to the given {@link PrintWriter}. */ | ||
| @Override | ||
| public void appendSummaryHtml(PrintWriter response) { | ||
| response.println("Cache Stats: <br><table>"); | ||
| response.println( | ||
| "<tr><th>Hit Ratio</th><th>Evictions</th><th>Entries</th>" | ||
| + "<th>Entry Values</th><th>Max Entry Values</th>" | ||
| + "<th>Id Weight</th><th>Entry Weight</th><th>Max Weight</th><th>Keys</th>" | ||
| + "</tr><tr>"); | ||
| CacheStats cacheStats = stateCache.stats(); | ||
| EntryStats entryStats = calculateEntryStats(); | ||
| response.println("<td>" + cacheStats.hitRate() + "</td>"); | ||
| response.println("<td>" + cacheStats.evictionCount() + "</td>"); | ||
| response.println("<td>" + entryStats.entries + "(" + stateCache.size() + " inc. weak) </td>"); | ||
| response.println("<td>" + entryStats.entryValues + "</td>"); | ||
| response.println("<td>" + entryStats.maxEntryValues + "</td>"); | ||
| response.println("<td>" + entryStats.idWeight / MEGABYTES + "MB</td>"); | ||
| response.println("<td>" + entryStats.entryWeight / MEGABYTES + "MB</td>"); | ||
| response.println("<td>" + getMaxWeight() / MEGABYTES + "MB</td>"); | ||
| response.println("<td>" + keyIndex.size() + "</td>"); | ||
| response.println("</tr></table><br>"); | ||
|
|
||
| response.println("<tr><th>Hit Ratio</th><td>" + cacheStats.hitRate() + "</td></tr>"); | ||
| response.println("<tr><th>Evictions</th><td>" + cacheStats.evictionCount() + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Entries</th><td>" | ||
| + entryStats.entries | ||
| + " (" | ||
| + stateCache.size() | ||
| + " inc. weak)</td></tr>"); | ||
| response.println("<tr><th>Entry Values</th><td>" + entryStats.entryValues + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Max Entry Values</th><td>" + entryStats.maxEntryValues + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Id Weight</th><td>" + entryStats.idWeight / MEGABYTES + "MB</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Entry Weight</th><td>" + entryStats.entryWeight / MEGABYTES + "MB</td></tr>"); | ||
| response.println("<tr><th>Max Weight</th><td>" + getMaxWeight() / MEGABYTES + "MB</td></tr>"); | ||
| response.println("<tr><th>Keys</th><td>" + keyIndex.size() + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Value Size Limit</th><td>" + getMaxCachedValueBytesLimit() + " bytes</td></tr>"); | ||
| if (enableHistogram) { | ||
| response.println( | ||
| "<tr><th>Entry Weight Dist</th><td>" | ||
| + formatHistogram(entryStats.entryWeightHistogram) | ||
| + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Value Weight Dist</th><td>" | ||
| + formatHistogram(entryStats.valueWeightHistogram) | ||
| + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Key Weight Dist</th><td>" | ||
| + formatHistogram(entryStats.keyWeightHistogram) | ||
| + "</td></tr>"); | ||
| } | ||
|
|
||
| response.println("</table><br>"); | ||
| } | ||
|
|
||
| public BaseStatusServlet statusServlet() { | ||
|
|
@@ -180,6 +248,31 @@ private static class EntryStats { | |
| long entryWeight; | ||
| long entryValues; | ||
| long maxEntryValues; | ||
| long[] entryWeightHistogram = new long[7]; | ||
| long[] valueWeightHistogram = new long[7]; | ||
| long[] keyWeightHistogram = new long[7]; | ||
|
|
||
| void addEntryWeight(long weight) { | ||
| entryWeightHistogram[getBucket(weight)]++; | ||
| } | ||
|
|
||
| void addValueWeight(long weight) { | ||
| valueWeightHistogram[getBucket(weight)]++; | ||
| } | ||
|
|
||
| void addKeyWeight(long weight) { | ||
| keyWeightHistogram[getBucket(weight)]++; | ||
| } | ||
|
|
||
| private int getBucket(long weight) { | ||
| if (weight < 128) return 0; | ||
| if (weight < 256) return 1; | ||
| if (weight < 512) return 2; | ||
| if (weight < 1024) return 3; | ||
| if (weight < 10 * 1024) return 4; | ||
| if (weight < 1024 * 1024) return 5; | ||
| return 6; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -413,7 +506,15 @@ public <T extends State> void put( | |
| } | ||
|
|
||
| public void persist() { | ||
| localCache.forEach(stateCache::put); | ||
| long limit = WindmillStateCache.this.getMaxCachedValueBytesLimit(); | ||
| localCache.forEach( | ||
| (id, entry) -> { | ||
| if (entry.getWeight() <= limit) { | ||
| stateCache.put(id, entry); | ||
|
Comment on lines
+512
to
+513
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The limit is compared against |
||
| } else { | ||
| stateCache.invalidate(id); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.