Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled);

/**
* The maximum size of cached values in bytes. Values larger than this limit will not be cached by
* the windmill state cache
*/
@Description("The maximum size of cached values in bytes.")
Comment thread
arunpandianp marked this conversation as resolved.
@Default.Long(Long.MAX_VALUE)
Long getMaxWindmillStateCacheValueBytes();

void setMaxWindmillStateCacheValueBytes(Long value);

/**
* Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for
* backwards compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig.Fetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
Expand Down Expand Up @@ -633,6 +635,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
.setMaxCachedValueBytes(options.getMaxWindmillStateCacheValueBytes())
.setEnableHistogram(
!ExperimentalOptions.hasExperiment(
options, "disable_windmill_user_state_cache_histogram"))
.build();

GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
Expand All @@ -651,6 +657,15 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
windmillStateCache::forComputation,
ID_GENERATOR));

Fetcher configFetcher = configFetcherComputationStateCacheAndWindmillClient.configFetcher();
configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
config -> {
windmillStateCache.setMaxCachedValueBytesOverride(
config.userWorkerJobSettings().getMaxCachedValueBytes());
});
Comment thread
arunpandianp marked this conversation as resolved.
Comment thread
arunpandianp marked this conversation as resolved.

ComputationStateCache computationStateCache =
configFetcherComputationStateCacheAndWindmillClient.computationStateCache();
WindmillServerStub windmillServer =
Expand Down Expand Up @@ -689,7 +704,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
return new StreamingDataflowWorker(
windmillServer,
clientId,
configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
configFetcher,
computationStateCache,
windmillStateCache,
workExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,18 @@ public class WindmillStateCache implements StatusDataProvider {
private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex;
private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes.
private final boolean supportMapViaMultimap;

WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) {
private final long defaultMaxCachedValueBytes;
private final boolean enableHistogram;
private volatile long maxCachedValueBytesOverride = -1L;

WindmillStateCache(
long sizeMb,
boolean supportMapViaMultimap,
long maxCachedValueBytes,
boolean enableHistogram) {
this.workerCacheBytes = sizeMb * MEGABYTES;
this.defaultMaxCachedValueBytes = maxCachedValueBytes;
this.enableHistogram = enableHistogram;
int stateCacheConcurrencyLevel =
Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors());
this.stateCache =
Expand All @@ -99,22 +108,48 @@ public interface Builder {

Builder setSupportMapViaMultimap(boolean supportMapViaMultimap);

Builder setMaxCachedValueBytes(long maxCachedValueBytes);

Builder setEnableHistogram(boolean enableHistogram);

WindmillStateCache build();
}

public static Builder builder() {
return new AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false);
return new AutoBuilder_WindmillStateCache_Builder()
.setSupportMapViaMultimap(false)
.setMaxCachedValueBytes(Long.MAX_VALUE)
.setEnableHistogram(true);
}

public void setMaxCachedValueBytesOverride(long limit) {
this.maxCachedValueBytesOverride = limit;
}

private long getMaxCachedValueBytesLimit() {
long override = maxCachedValueBytesOverride;
return override >= 0 ? override : defaultMaxCachedValueBytes;
}

private EntryStats calculateEntryStats() {
EntryStats stats = new EntryStats();
BiConsumer<StateId, StateCacheEntry> consumer =
(stateId, stateCacheEntry) -> {
stats.entries++;
stats.idWeight += stateId.getWeight();
stats.entryWeight += stateCacheEntry.getWeight();
long idWeight = stateId.getWeight();
stats.idWeight += idWeight;
long entryWeight = stateCacheEntry.getWeight();
stats.entryWeight += entryWeight;
stats.entryValues += stateCacheEntry.values.size();
stats.maxEntryValues = Math.max(stats.maxEntryValues, stateCacheEntry.values.size());
if (enableHistogram) {
stats.addKeyWeight(idWeight);
stats.addEntryWeight(entryWeight);
stateCacheEntry.values.forEach(
(encodedAddress, weightedValue) -> {
Comment on lines +146 to +149
Copy link
Copy Markdown
Contributor Author

@arunpandianp arunpandianp May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed once every (10m) debug capture report. I think computing the histograms here is better than the overhead for keeping it in sync during the processing. Added an experiment to disable the histogram stats if needed. Any other options?

stats.addValueWeight(weightedValue.weight);
});
Comment on lines +148 to +151
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Iterating over all values in every cache entry to calculate the weight distribution increases the complexity of calculateEntryStats from $O(\text{entries})$ to $O(\text{total values})$. For large caches with many values per entry (e.g., large BagState or MapState), this could cause noticeable delays when accessing the worker's status page. Since this is for a debug page, it might be acceptable, but consider if the performance impact has been evaluated for very large states.

}
};
stateCache.asMap().forEach(consumer);
return stats;
Expand All @@ -138,27 +173,60 @@ public ForComputation forComputation(String computation) {
return new ForComputation(computation);
}

private static String formatHistogram(long[] histogram) {
return String.format(
"[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]",
histogram[0],
histogram[1],
histogram[2],
histogram[3],
histogram[4],
histogram[5],
histogram[6]);
}

/** Print summary statistics of the cache to the given {@link PrintWriter}. */
@Override
public void appendSummaryHtml(PrintWriter response) {
response.println("Cache Stats: <br><table>");
response.println(
"<tr><th>Hit Ratio</th><th>Evictions</th><th>Entries</th>"
+ "<th>Entry Values</th><th>Max Entry Values</th>"
+ "<th>Id Weight</th><th>Entry Weight</th><th>Max Weight</th><th>Keys</th>"
+ "</tr><tr>");
CacheStats cacheStats = stateCache.stats();
EntryStats entryStats = calculateEntryStats();
response.println("<td>" + cacheStats.hitRate() + "</td>");
response.println("<td>" + cacheStats.evictionCount() + "</td>");
response.println("<td>" + entryStats.entries + "(" + stateCache.size() + " inc. weak) </td>");
response.println("<td>" + entryStats.entryValues + "</td>");
response.println("<td>" + entryStats.maxEntryValues + "</td>");
response.println("<td>" + entryStats.idWeight / MEGABYTES + "MB</td>");
response.println("<td>" + entryStats.entryWeight / MEGABYTES + "MB</td>");
response.println("<td>" + getMaxWeight() / MEGABYTES + "MB</td>");
response.println("<td>" + keyIndex.size() + "</td>");
response.println("</tr></table><br>");

response.println("<tr><th>Hit Ratio</th><td>" + cacheStats.hitRate() + "</td></tr>");
response.println("<tr><th>Evictions</th><td>" + cacheStats.evictionCount() + "</td></tr>");
response.println(
"<tr><th>Entries</th><td>"
+ entryStats.entries
+ " ("
+ stateCache.size()
+ " inc. weak)</td></tr>");
response.println("<tr><th>Entry Values</th><td>" + entryStats.entryValues + "</td></tr>");
response.println(
"<tr><th>Max Entry Values</th><td>" + entryStats.maxEntryValues + "</td></tr>");
response.println(
"<tr><th>Id Weight</th><td>" + entryStats.idWeight / MEGABYTES + "MB</td></tr>");
response.println(
"<tr><th>Entry Weight</th><td>" + entryStats.entryWeight / MEGABYTES + "MB</td></tr>");
response.println("<tr><th>Max Weight</th><td>" + getMaxWeight() / MEGABYTES + "MB</td></tr>");
response.println("<tr><th>Keys</th><td>" + keyIndex.size() + "</td></tr>");
response.println(
"<tr><th>Value Size Limit</th><td>" + getMaxCachedValueBytesLimit() + " bytes</td></tr>");
if (enableHistogram) {
response.println(
"<tr><th>Entry Weight Dist</th><td>"
+ formatHistogram(entryStats.entryWeightHistogram)
+ "</td></tr>");
response.println(
"<tr><th>Value Weight Dist</th><td>"
+ formatHistogram(entryStats.valueWeightHistogram)
+ "</td></tr>");
response.println(
"<tr><th>Key Weight Dist</th><td>"
+ formatHistogram(entryStats.keyWeightHistogram)
+ "</td></tr>");
}

response.println("</table><br>");
}

public BaseStatusServlet statusServlet() {
Expand All @@ -180,6 +248,31 @@ private static class EntryStats {
long entryWeight;
long entryValues;
long maxEntryValues;
long[] entryWeightHistogram = new long[7];
long[] valueWeightHistogram = new long[7];
long[] keyWeightHistogram = new long[7];

void addEntryWeight(long weight) {
entryWeightHistogram[getBucket(weight)]++;
}

void addValueWeight(long weight) {
valueWeightHistogram[getBucket(weight)]++;
}

void addKeyWeight(long weight) {
keyWeightHistogram[getBucket(weight)]++;
}

private int getBucket(long weight) {
if (weight < 128) return 0;
if (weight < 256) return 1;
if (weight < 512) return 2;
if (weight < 1024) return 3;
if (weight < 10 * 1024) return 4;
if (weight < 1024 * 1024) return 5;
return 6;
}
}

/**
Expand Down Expand Up @@ -413,7 +506,15 @@ public <T extends State> void put(
}

public void persist() {
localCache.forEach(stateCache::put);
long limit = WindmillStateCache.this.getMaxCachedValueBytesLimit();
localCache.forEach(
(id, entry) -> {
if (entry.getWeight() <= limit) {
stateCache.put(id, entry);
Comment on lines +512 to +513
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The limit is compared against entry.getWeight(), which includes the overhead of the StateCacheEntry and StateId (approximately 136 bytes). This means that if a user sets a small limit (e.g., 100 bytes), no values will ever be cached. The documentation in DataflowStreamingPipelineOptions says "maximum size of cached values", which might lead users to believe it only applies to the payload. Consider clarifying the documentation or adjusting the logic to only account for the value size if that was the intent.

} else {
stateCache.invalidate(id);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,68 @@ public void testMaxWeight() throws Exception {
assertEquals(400 * MEGABYTES, cache.getMaxWeight());
}

@Test
public void testMaxCachedValueBytes() throws Exception {
cache.setMaxCachedValueBytesOverride(
100); // Set limit to 100 bytes, per cache entry overhead is 136.

WindmillStateCache.ForKeyAndFamily keyCache =
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 1L).forFamily(STATE_FAMILY);

TestStateTag tag1 = new TestStateTag("tag1");
TestStateTag tag2 = new TestStateTag("tag2");

putInCache(keyCache, StateNamespaces.global(), tag1, new TestState("g1"), 10);
keyCache.persist();

// It should not be in global cache because it's too large.
keyCache =
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 2L).forFamily(STATE_FAMILY);
assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag1));

// Now set limit larger.
cache.setMaxCachedValueBytesOverride(1000);

putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"), 10);
keyCache.persist();

// It should be in global cache.
keyCache =
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 3L).forFamily(STATE_FAMILY);
assertEquals(
Optional.of(new TestState("g2")), getFromCache(keyCache, StateNamespaces.global(), tag2));

// Now update it to be larger than limit.
putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2_large"), 2000);
keyCache.persist();

// It should be removed from global cache.
keyCache =
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 4L).forFamily(STATE_FAMILY);
assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag2));
}

@Test
public void testDisableHistogram() throws Exception {
WindmillStateCache noHistogramCache =
WindmillStateCache.builder().setSizeMb(400).setEnableHistogram(false).build();
WindmillStateCache.ForKeyAndFamily keyCache =
noHistogramCache
.forComputation(COMPUTATION)
.forKey(COMPUTATION_KEY, 0L, 1L)
.forFamily(STATE_FAMILY);

putInCache(
keyCache, StateNamespaces.global(), new TestStateTag("tag1"), new TestState("g1"), 2);
keyCache.persist();

java.io.StringWriter writer = new java.io.StringWriter();
noHistogramCache.appendSummaryHtml(new java.io.PrintWriter(writer));
String summary = writer.toString();

org.junit.Assert.assertFalse(summary.contains("Entry Weight Dist"));
}

/** Verifies that values are cached in the appropriate namespaces. */
@Test
public void testInvalidation() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,8 @@ message UserWorkerRunnerV1Settings {
optional ConnectivityType connectivity_type = 4
[default = CONNECTIVITY_TYPE_DEFAULT];

optional int64 max_cached_value_bytes = 5 [default = -1];

reserved 1, 2;
}

Expand Down
Loading