Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming;

/**
* A handle to use when requesting pulling more work from @BoundedQueueExecutor
* via @BoundedQueueExecutor.pollWork
*/
public interface BoundedQueueExecutorWorkHandle {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,49 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;
import java.util.function.Consumer;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;

/** {@link Work} instance and a processing function used to process the work. */
@AutoValue
public abstract class ExecutableWork implements Runnable {
public final class ExecutableWork {

public static ExecutableWork create(Work work, Consumer<Work> executeWorkFn) {
return new AutoValue_ExecutableWork(work, executeWorkFn);
private final Work work;
private final BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn;

private ExecutableWork(
Work work, BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn) {
this.work = Objects.requireNonNull(work);
this.executeWorkFn = Objects.requireNonNull(executeWorkFn);
}

public abstract Work work();
/**
* Creates an {@link ExecutableWork} instance.
*
* @param executeWorkFn The function executing the work. It'll be called along with a
* BoundedQueueExecutorWorkHandle. The handle needs to be passed to BoundedQueueExecutor when
* requesting more work to process inline.
*/
public static ExecutableWork create(
Work work, BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn) {
Comment thread
arunpandianp marked this conversation as resolved.
return new ExecutableWork(work, executeWorkFn);
}

public abstract Consumer<Work> executeWorkFn();
public Work work() {
return work;
}

@Override
public void run() {
executeWorkFn().accept(work());
public BiConsumer<Work, BoundedQueueExecutorWorkHandle> executeWorkFn() {
return executeWorkFn;
}

public void run(BoundedQueueExecutorWorkHandle handle) {
try {
executeWorkFn().accept(work(), handle);
} catch (Throwable t) {
throw ExceptionUtils.safeWrapThrowableAsException(t);
}
}

public final WorkId id() {
Expand All @@ -45,4 +69,9 @@ public final WorkId id() {
public final Windmill.WorkItem getWorkItem() {
return work().getWorkItem();
}

@Override
public String toString() {
return "ExecutableWork{" + id() + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;

Expand All @@ -38,7 +45,19 @@ public class BoundedQueueExecutor {

// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor;
private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>();

private static class Budget {

final int elements;
final long bytes;

Budget(int elements, long bytes) {
this.elements = elements;
this.bytes = bytes;
}
}

private final ConcurrentLinkedQueue<Budget> decrementQueue = new ConcurrentLinkedQueue<>();
private final Object decrementQueueDrainLock = new Object();
private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false);
private int elementsOutstanding = 0;
Expand Down Expand Up @@ -106,7 +125,7 @@ protected void afterExecute(Runnable r, Throwable t) {

// Before adding a Work to the queue, check that there are enough bytes of space or no other
// outstanding elements of work.
public void execute(Runnable work, long workBytes) {
public void execute(ExecutableWork work, long workBytes) {
monitor.enterWhenUninterruptibly(
new Guard(monitor) {
@Override
Expand All @@ -119,12 +138,18 @@ public boolean isSatisfied() {
executeMonitorHeld(work, workBytes);
}

// Forcibly add something to the queue, ignoring the length limit.
public void forceExecute(Runnable work, long workBytes) {
// Forcibly add ExecutableWork to the queue, ignoring the limits.
public void forceExecute(ExecutableWork work, long workBytes) {
monitor.enter();
executeMonitorHeld(work, workBytes);
}

/** Forcibly execute a Runnable callback with 0 bytes of size. */
public void forceExecute(Runnable runnable) {
monitor.enter();
executeMonitorHeld(runnable);
}

// Set the maximum/core pool size of the executor.
public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) {
// For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core
Expand Down Expand Up @@ -221,8 +246,115 @@ public String summaryHtml() {
}
}

private void executeMonitorHeld(Runnable work, long workBytes) {
/**
* A handle to use when requesting pulling more work from @BoundedQueueExecutor
* via @BoundedQueueExecutor.pollWork. A single handle aggregates all budgets from work pulled for
* inline execution and releases the budget after the multi work bundle is complete.
*/
final class BoundedQueueExecutorWorkHandleImpl
implements BoundedQueueExecutorWorkHandle, AutoCloseable {

@GuardedBy("this")
private int elements;
Comment thread
arunpandianp marked this conversation as resolved.

@GuardedBy("this")
private long bytes;

@GuardedBy("this")
private boolean closed = false;

private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
checkArgument(elements >= 0 && bytes >= 0);
this.elements = elements;
this.bytes = bytes;
}

/**
* Merges the budget from another handle into this handle.
*
* <p>This transfers the budget (elements and bytes) from the {@code other} handle to this
* handle, and marks the {@code other} handle as closed to prevent it from releasing the budget
* again if it is closed.
*/
public void merge(BoundedQueueExecutorWorkHandleImpl other) {
checkArgumentNotNull(other);
synchronized (this) {
Preconditions.checkState(!closed, "Cannot merge into a closed handle");
synchronized (other) {
Preconditions.checkState(!other.closed, "Cannot merge a closed handle");
this.elements += other.elements;
this.bytes += other.bytes;
other.closed = true;
other.elements = 0;
other.bytes = 0;
}
}
}

public synchronized boolean isClosed() {
return closed;
}

@VisibleForTesting
synchronized int elements() {
return elements;
}

@VisibleForTesting
synchronized long bytes() {
return bytes;
}

@Override
public synchronized void close() {
if (closed) return;
closed = true;
decrementCounters(this.elements, this.bytes);
}
Comment thread
arunpandianp marked this conversation as resolved.
Comment thread
arunpandianp marked this conversation as resolved.
}

private static final class QueuedWork implements Runnable {

private final ExecutableWork work;
private final BoundedQueueExecutorWorkHandleImpl handle;

public QueuedWork(ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle) {
this.work = work;
this.handle = handle;
}

public ExecutableWork getWork() {
return work;
}

public BoundedQueueExecutorWorkHandleImpl getHandle() {
return handle;
}

@Override
public void run() {
checkArgument(!handle.isClosed());
try (handle) {
work.run(handle);
}
}
}

private void executeMonitorHeld(ExecutableWork work, long workBytes) {
++elementsOutstanding;
bytesOutstanding += workBytes;
monitor.leave();
BoundedQueueExecutorWorkHandleImpl handle =
new BoundedQueueExecutorWorkHandleImpl(1, workBytes);
try {
Comment thread
arunpandianp marked this conversation as resolved.
executor.execute(new QueuedWork(work, handle));
} catch (Throwable t) {
handle.close();
throw ExceptionUtils.safeWrapThrowableAsException(t);
}
}

private void executeMonitorHeld(Runnable work) {
++elementsOutstanding;
monitor.leave();

Expand All @@ -232,21 +364,25 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
try {
work.run();
} finally {
decrementCounters(workBytes);
decrementCounters(1, 0L);
}
});
} catch (RuntimeException e) {
// If the execute() call threw an exception, decrement counters here.
decrementCounters(workBytes);
throw e;
} catch (Throwable t) {
decrementCounters(1, 0L);
throw ExceptionUtils.safeWrapThrowableAsException(t);
}
}

private void decrementCounters(long workBytes) {
@VisibleForTesting
BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes) {
return new BoundedQueueExecutorWorkHandleImpl(elements, bytes);
}

private void decrementCounters(int elements, long bytes) {
// All threads queue decrements and one thread grabs the monitor and updates
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// counters. We do this to reduce contention on monitor which is locked by
// GetWork thread
decrementQueue.add(workBytes);
decrementQueue.add(new Budget(elements, bytes));
boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
if (submittedToExistingBatch) {
// There is already a thread about to drain the decrement queue
Expand All @@ -265,12 +401,12 @@ private void decrementCounters(long workBytes) {
long bytesToDecrement = 0;
int elementsToDecrement = 0;
while (true) {
Long pollResult = decrementQueue.poll();
Budget pollResult = decrementQueue.poll();
if (pollResult == null) {
break;
}
bytesToDecrement += pollResult;
++elementsToDecrement;
bytesToDecrement += pollResult.bytes;
elementsToDecrement += pollResult.elements;
}
if (elementsToDecrement == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;

import javax.annotation.CheckReturnValue;
import org.apache.beam.sdk.annotations.Internal;

/** Utility methods for simplifying work with exceptions and throwables. */
@Internal
public final class ExceptionUtils {

private ExceptionUtils() {}

/**
* Returns the {@code throwable} as-is if it is an instance of {@link RuntimeException} throws if
* it is an {@link Error}, or returns the {@code throwable} wrapped in a {@code RuntimeException}.
*/
@CheckReturnValue
public static RuntimeException safeWrapThrowableAsException(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
return new RuntimeException(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
}
for (Runnable callback : callbacksToExecute) {
try {
finalizationExecutor.forceExecute(callback, 0);
finalizationExecutor.forceExecute(callback);
} catch (OutOfMemoryError oom) {
throw oom;
} catch (Throwable t) {
Expand Down
Loading
Loading