Skip to content
Closed
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
org.gradle.caching=true
org.gradle.parallel=true
org.gradle.caching=false
org.gradle.parallel=false
org.gradle.daemon=false
org.gradle.configureondemand=true
org.gradle.jvmargs=-Xss10240k
Expand Down
6 changes: 5 additions & 1 deletion runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ dependencies {
dependOnProjectsAndConfigs.each {
// For projects producing shadowjar, use the packaged jar as dependency to
// handle redirected packages from it
implementation project(path: it.key, configuration: it.value)
if (it.value != null) {
implementation project(path: it.key, configuration: it.value)
} else {
implementation project(it.key)
}
}
shadow library.java.vendored_grpc_1_69_0
shadow library.java.joda_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,26 @@
*/
package org.apache.beam.runners.dataflow.worker;

import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;

/** Indicates that the key token was invalid when data was attempted to be fetched. */
public class KeyTokenInvalidException extends RuntimeException {
private final @Nullable ShardedKey shardedKey;

public KeyTokenInvalidException(String key) {
super("Unable to fetch data due to token mismatch for key " + key);
this.shardedKey = null;
}

public KeyTokenInvalidException(ShardedKey shardedKey, String key) {
super("Unable to fetch data due to token mismatch for key " + key);
this.shardedKey = shardedKey;
}

public Optional<ShardedKey> getShardedKey() {
return Optional.ofNullable(shardedKey);
}

/** Returns whether an exception was caused by a {@link KeyTokenInvalidException}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public final class StreamingDataflowWorker {
// Experiment make the monitor within BoundedQueueExecutor fair
public static final String BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT =
"windmill_bounded_queue_executor_use_fair_monitor";
public static final String ENABLE_KEY_GROUP_WORK_QUEUE_EXPERIMENT =
"unstable_enable_multi_key_bundle";

private final WindmillStateCache stateCache;
private AtomicReference<StreamingWorkerStatusPages> statusPages = new AtomicReference<>();
Expand Down Expand Up @@ -1017,14 +1019,17 @@ private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, l
private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
boolean useFairMonitor =
DataflowRunner.hasExperiment(options, BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT);
boolean useKeyGroupWorkQueue =
DataflowRunner.hasExperiment(options, ENABLE_KEY_GROUP_WORK_QUEUE_EXPERIMENT);
return new BoundedQueueExecutor(
chooseMaxThreads(options),
THREAD_EXPIRATION_TIME_SEC,
TimeUnit.SECONDS,
chooseMaxBundlesOutstanding(options),
chooseMaxBytesOutstanding(options),
new ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
useFairMonitor);
useFairMonitor,
useKeyGroupWorkQueue);
}

public static void main(String[] args) throws Exception {
Expand Down
Loading
Loading