Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ public final class StreamingDataflowWorker {
// Experiment make the monitor within BoundedQueueExecutor fair
public static final String BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT =
"windmill_bounded_queue_executor_use_fair_monitor";
// Don't use. Experiment guarding multi key bundles. The feature is work in progress and
// incomplete.
private static final String UNSTABLE_ENABLE_MULTI_KEY_BUNDLE = "unstable_enable_multi_key_bundle";

private final WindmillStateCache stateCache;
private AtomicReference<StreamingWorkerStatusPages> statusPages = new AtomicReference<>();
Expand Down Expand Up @@ -1017,14 +1020,17 @@ private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, l
private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
boolean useFairMonitor =
DataflowRunner.hasExperiment(options, BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT);
boolean useKeyGroupWorkQueue =
DataflowRunner.hasExperiment(options, UNSTABLE_ENABLE_MULTI_KEY_BUNDLE);
return new BoundedQueueExecutor(
chooseMaxThreads(options),
THREAD_EXPIRATION_TIME_SEC,
TimeUnit.SECONDS,
chooseMaxBundlesOutstanding(options),
chooseMaxBytesOutstanding(options),
new ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
useFairMonitor);
useFairMonitor,
useKeyGroupWorkQueue);
}

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.streaming;

import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
Expand Down Expand Up @@ -62,16 +63,24 @@ public void run(BoundedQueueExecutorWorkHandle handle) {
}
}

public final WorkId id() {
public WorkId id() {
return work().id();
}

public final Windmill.WorkItem getWorkItem() {
public Windmill.WorkItem getWorkItem() {
return work().getWorkItem();
}

@Override
public String toString() {
return "ExecutableWork{" + id() + "}";
}

public String getComputationId() {
return work().getComputationId();
}

public Optional<Work.KeyGroup> getKeyGroup() {
return work().getKeyGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.IntSummaryStatistics;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand All @@ -74,6 +76,7 @@ public final class Work implements RefreshableWork {
private final Instant startTime;
private final Map<LatencyAttribution.State, Duration> totalDurationPerState;
private final WorkId id;
private final Optional<KeyGroup> keyGroup;
private final String latencyTrackingId;
private final long serializedWorkItemSize;
private volatile TimedState currentState;
Expand Down Expand Up @@ -101,6 +104,11 @@ private Work(
// keyUniverse inside EnumMap every time.
this.totalDurationPerState = new EnumMap<>(EMPTY_ENUM_MAP);
this.id = WorkId.of(workItem);
this.keyGroup =
workItem.hasKeyGroup()
? Optional.of(
KeyGroup.create(workItem.getKeyGroup().getHigh(), workItem.getKeyGroup().getLow()))
: Optional.empty();
this.latencyTrackingId =
Long.toHexString(workItem.getShardingKey())
+ '-'
Expand Down Expand Up @@ -383,6 +391,14 @@ private boolean isCommitPending() {
abstract Instant startTime();
}

public String getComputationId() {
return processingContext.computationId();
}

public Optional<KeyGroup> getKeyGroup() {
return keyGroup;
}

@AutoValue
public abstract static class ProcessingContext {

Expand Down Expand Up @@ -416,4 +432,48 @@ private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest reque
return Optional.ofNullable(getDataClient().getStateData(computationId(), request));
}
}

public static final class KeyGroup {
private final long high;
private final long low;

private KeyGroup(long high, long low) {
this.high = high;
this.low = low;
}

public static KeyGroup create(long high, long low) {
return new KeyGroup(high, low);
}

public long high() {
return high;
}

public long low() {
return low;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (!(o instanceof KeyGroup)) {
return false;
}
KeyGroup other = (KeyGroup) o;
return high == other.high && low == other.low;
}

@Override
public int hashCode() {
return Objects.hash(high, low);
}

@Override
public String toString() {
return "KeyGroup{" + "high=" + high + ", low=" + low + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
Expand All @@ -29,6 +30,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
Expand Down Expand Up @@ -85,7 +87,8 @@ public BoundedQueueExecutor(
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory,
boolean useFairMonitor) {
boolean useFairMonitor,
boolean useKeyGroupWorkQueue) {
this.maximumPoolSize = initialMaximumPoolSize;
monitor = new Monitor(useFairMonitor);
executor =
Expand All @@ -94,7 +97,7 @@ public BoundedQueueExecutor(
initialMaximumPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
useKeyGroupWorkQueue ? new KeyGroupWorkQueue() : new LinkedBlockingQueue<>(),
threadFactory) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
Expand Down Expand Up @@ -313,7 +316,7 @@ public synchronized void close() {
}
}

private static final class QueuedWork implements Runnable {
static final class QueuedWork implements Runnable {

private final ExecutableWork work;
private final BoundedQueueExecutorWorkHandleImpl handle;
Expand Down Expand Up @@ -378,6 +381,23 @@ BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes)
return new BoundedQueueExecutorWorkHandleImpl(elements, bytes);
}

/** Poll work for a specific computationId and keyGroup. */
public Optional<ExecutableWork> pollWork(
String computationId, Work.KeyGroup keyGroup, BoundedQueueExecutorWorkHandle handle) {
checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl);
BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle;
if (!(executor.getQueue() instanceof KeyGroupWorkQueue)) {
return Optional.empty();
}
QueuedWork queuedWork =
((KeyGroupWorkQueue) executor.getQueue()).pollWork(computationId, keyGroup);
if (queuedWork == null) {
return Optional.empty();
}
internalHandle.merge(queuedWork.getHandle());
return Optional.of(queuedWork.getWork());
}

private void decrementCounters(int elements, long bytes) {
// All threads queue decrements and one thread grabs the monitor and updates
// counters. We do this to reduce contention on monitor which is locked by
Expand Down
Loading
Loading