-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Dataflow Streaming] Prepare BoundedQueueExecutor for MultiKey bundles #38592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+363
−66
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
4753a8b
[Dataflow Streaming] Prepare BoundedQueueExecutor for MultiKey bundles
arunpandianp 0726da4
[Dataflow Streaming] Refactor BoundedQueueExecutor with handles
arunpandianp 32fa605
doc fix
arunpandianp d6e6ff3
fix comments
arunpandianp 0919494
Address comments
arunpandianp b189f43
spotless fix
arunpandianp 366df50
Address comments
arunpandianp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
24 changes: 24 additions & 0 deletions
24
...ava/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.streaming; | ||
|
|
||
| /** | ||
| * A handle to use when requesting pulling more work from @BoundedQueueExecutor | ||
| * via @BoundedQueueExecutor.pollWork | ||
| */ | ||
| public interface BoundedQueueExecutorWorkHandle {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,20 @@ | |
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; | ||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import javax.annotation.concurrent.GuardedBy; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; | ||
|
|
||
|
|
@@ -38,7 +45,19 @@ public class BoundedQueueExecutor { | |
|
|
||
| // Used to guard elementsOutstanding and bytesOutstanding. | ||
| private final Monitor monitor; | ||
| private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| private static class Budget { | ||
|
|
||
| final int elements; | ||
| final long bytes; | ||
|
|
||
| Budget(int elements, long bytes) { | ||
| this.elements = elements; | ||
| this.bytes = bytes; | ||
| } | ||
| } | ||
|
|
||
| private final ConcurrentLinkedQueue<Budget> decrementQueue = new ConcurrentLinkedQueue<>(); | ||
| private final Object decrementQueueDrainLock = new Object(); | ||
| private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); | ||
| private int elementsOutstanding = 0; | ||
|
|
@@ -106,7 +125,7 @@ protected void afterExecute(Runnable r, Throwable t) { | |
|
|
||
| // Before adding a Work to the queue, check that there are enough bytes of space or no other | ||
| // outstanding elements of work. | ||
| public void execute(Runnable work, long workBytes) { | ||
| public void execute(ExecutableWork work, long workBytes) { | ||
| monitor.enterWhenUninterruptibly( | ||
| new Guard(monitor) { | ||
| @Override | ||
|
|
@@ -119,12 +138,18 @@ public boolean isSatisfied() { | |
| executeMonitorHeld(work, workBytes); | ||
| } | ||
|
|
||
| // Forcibly add something to the queue, ignoring the length limit. | ||
| public void forceExecute(Runnable work, long workBytes) { | ||
| // Forcibly add ExecutableWork to the queue, ignoring the limits. | ||
| public void forceExecute(ExecutableWork work, long workBytes) { | ||
| monitor.enter(); | ||
| executeMonitorHeld(work, workBytes); | ||
| } | ||
|
|
||
| /** Forcibly execute a Runnable callback with 0 bytes of size. */ | ||
| public void forceExecute(Runnable runnable) { | ||
| monitor.enter(); | ||
| executeMonitorHeld(runnable); | ||
| } | ||
|
|
||
| // Set the maximum/core pool size of the executor. | ||
| public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) { | ||
| // For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core | ||
|
|
@@ -221,8 +246,115 @@ public String summaryHtml() { | |
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(Runnable work, long workBytes) { | ||
| /** | ||
| * A handle to use when requesting pulling more work from @BoundedQueueExecutor | ||
| * via @BoundedQueueExecutor.pollWork. A single handle aggregates all budgets from work pulled for | ||
| * inline execution and releases the budget after the multi work bundle is complete. | ||
| */ | ||
| final class BoundedQueueExecutorWorkHandleImpl | ||
| implements BoundedQueueExecutorWorkHandle, AutoCloseable { | ||
|
|
||
| @GuardedBy("this") | ||
| private int elements; | ||
|
arunpandianp marked this conversation as resolved.
|
||
|
|
||
| @GuardedBy("this") | ||
| private long bytes; | ||
|
|
||
| @GuardedBy("this") | ||
| private boolean closed = false; | ||
|
|
||
| private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) { | ||
| checkArgument(elements >= 0 && bytes >= 0); | ||
| this.elements = elements; | ||
| this.bytes = bytes; | ||
| } | ||
|
|
||
| /** | ||
| * Merges the budget from another handle into this handle. | ||
| * | ||
| * <p>This transfers the budget (elements and bytes) from the {@code other} handle to this | ||
| * handle, and marks the {@code other} handle as closed to prevent it from releasing the budget | ||
| * again if it is closed. | ||
| */ | ||
| public void merge(BoundedQueueExecutorWorkHandleImpl other) { | ||
| checkArgumentNotNull(other); | ||
| synchronized (this) { | ||
| Preconditions.checkState(!closed, "Cannot merge into a closed handle"); | ||
| synchronized (other) { | ||
| Preconditions.checkState(!other.closed, "Cannot merge a closed handle"); | ||
| this.elements += other.elements; | ||
| this.bytes += other.bytes; | ||
| other.closed = true; | ||
| other.elements = 0; | ||
| other.bytes = 0; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public synchronized boolean isClosed() { | ||
| return closed; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| synchronized int elements() { | ||
| return elements; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| synchronized long bytes() { | ||
| return bytes; | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized void close() { | ||
| if (closed) return; | ||
| closed = true; | ||
| decrementCounters(this.elements, this.bytes); | ||
| } | ||
|
arunpandianp marked this conversation as resolved.
arunpandianp marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private static final class QueuedWork implements Runnable { | ||
|
|
||
| private final ExecutableWork work; | ||
| private final BoundedQueueExecutorWorkHandleImpl handle; | ||
|
|
||
| public QueuedWork(ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle) { | ||
| this.work = work; | ||
| this.handle = handle; | ||
| } | ||
|
|
||
| public ExecutableWork getWork() { | ||
| return work; | ||
| } | ||
|
|
||
| public BoundedQueueExecutorWorkHandleImpl getHandle() { | ||
| return handle; | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| checkArgument(!handle.isClosed()); | ||
| try (handle) { | ||
| work.run(handle); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(ExecutableWork work, long workBytes) { | ||
| ++elementsOutstanding; | ||
| bytesOutstanding += workBytes; | ||
| monitor.leave(); | ||
| BoundedQueueExecutorWorkHandleImpl handle = | ||
| new BoundedQueueExecutorWorkHandleImpl(1, workBytes); | ||
| try { | ||
|
arunpandianp marked this conversation as resolved.
|
||
| executor.execute(new QueuedWork(work, handle)); | ||
| } catch (Throwable t) { | ||
| handle.close(); | ||
| throw ExceptionUtils.safeWrapThrowableAsException(t); | ||
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(Runnable work) { | ||
| ++elementsOutstanding; | ||
| monitor.leave(); | ||
|
|
||
|
|
@@ -232,21 +364,25 @@ private void executeMonitorHeld(Runnable work, long workBytes) { | |
| try { | ||
| work.run(); | ||
| } finally { | ||
| decrementCounters(workBytes); | ||
| decrementCounters(1, 0L); | ||
| } | ||
| }); | ||
| } catch (RuntimeException e) { | ||
| // If the execute() call threw an exception, decrement counters here. | ||
| decrementCounters(workBytes); | ||
| throw e; | ||
| } catch (Throwable t) { | ||
| decrementCounters(1, 0L); | ||
| throw ExceptionUtils.safeWrapThrowableAsException(t); | ||
| } | ||
| } | ||
|
|
||
| private void decrementCounters(long workBytes) { | ||
| @VisibleForTesting | ||
| BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes) { | ||
| return new BoundedQueueExecutorWorkHandleImpl(elements, bytes); | ||
| } | ||
|
|
||
| private void decrementCounters(int elements, long bytes) { | ||
| // All threads queue decrements and one thread grabs the monitor and updates | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keep comment?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| // counters. We do this to reduce contention on monitor which is locked by | ||
| // GetWork thread | ||
| decrementQueue.add(workBytes); | ||
| decrementQueue.add(new Budget(elements, bytes)); | ||
| boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); | ||
| if (submittedToExistingBatch) { | ||
| // There is already a thread about to drain the decrement queue | ||
|
|
@@ -265,12 +401,12 @@ private void decrementCounters(long workBytes) { | |
| long bytesToDecrement = 0; | ||
| int elementsToDecrement = 0; | ||
| while (true) { | ||
| Long pollResult = decrementQueue.poll(); | ||
| Budget pollResult = decrementQueue.poll(); | ||
| if (pollResult == null) { | ||
| break; | ||
| } | ||
| bytesToDecrement += pollResult; | ||
| ++elementsToDecrement; | ||
| bytesToDecrement += pollResult.bytes; | ||
| elementsToDecrement += pollResult.elements; | ||
| } | ||
| if (elementsToDecrement == 0) { | ||
| return; | ||
|
|
||
43 changes: 43 additions & 0 deletions
43
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import javax.annotation.CheckReturnValue; | ||
| import org.apache.beam.sdk.annotations.Internal; | ||
|
|
||
| /** Utility methods for simplifying work with exceptions and throwables. */ | ||
| @Internal | ||
| public final class ExceptionUtils { | ||
|
|
||
| private ExceptionUtils() {} | ||
|
|
||
| /** | ||
| * Returns the {@code throwable} as-is if it is an instance of {@link RuntimeException} throws if | ||
| * it is an {@link Error}, or returns the {@code throwable} wrapped in a {@code RuntimeException}. | ||
| */ | ||
| @CheckReturnValue | ||
| public static RuntimeException safeWrapThrowableAsException(Throwable throwable) { | ||
| if (throwable instanceof RuntimeException) { | ||
| return (RuntimeException) throwable; | ||
| } else if (throwable instanceof Error) { | ||
| throw (Error) throwable; | ||
| } else { | ||
| return new RuntimeException(throwable); | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.