Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nu
<KeyT extends @Nullable Object> void onWindowExpiration(
BoundedWindow window, Instant timestamp, KeyT key);

/**
* Performs per-key cleanup or processing after all elements and timers for a key have been
* processed.
*/
void finishKey();

/**
* Returns the underlying fn instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public void finishBundle() {
doFnRunner.finishBundle();
}

@Override
public void finishKey() {
doFnRunner.finishKey();
}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
doFnRunner.onWindowExpiration(window, timestamp, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ public void finishBundle() {
}
}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
invoker.invokeOnWindowExpiration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public void finishBundle() {
doFnRunner.finishBundle();
}

@Override
public void finishKey() {
doFnRunner.finishKey();
}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
doFnRunner.onWindowExpiration(window, timestamp, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public void finishBundle() {
finished = true;
}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public void finishBundle() {
container.updateMetrics(stepName);
}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
delegate.onWindowExpiration(window, timestamp, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,9 @@ public void finishBundle() {
}
}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ public void finishBundle() {
Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run);
}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ public void finishBundle() {
wrappedRunner.finishBundle();
}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(
BoundedWindow window, Instant timestamp, KeyT key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public void processTimers() throws Exception {
// Nothing.
}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() throws Exception {
receiver = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public void processTimers() throws Exception {
// The timers for the underlying ParDoFn are processed at the end of each element
}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() throws Exception {
underlyingParDoFn.finishBundle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public void processElement(Object untypedElem) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public void finishBundle() {
simpleRunner.finishBundle();
}

@Override
public void finishKey() {
simpleRunner.finishKey();
}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
simpleRunner.onWindowExpiration(window, timestamp, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public void processTimers() throws Exception {
delegate.processTimers();
}

@Override
public void finishKey() throws Exception {
delegate.finishKey();
}

@Override
public void finishBundle() throws Exception {
delegate.finishBundle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ private void invokeProcessElement(WindowedValue<InputT> elem) {
@Override
public void finishBundle() {}

@Override
public void finishKey() {}

@Override
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ public void processTimers() throws Exception {
// it here to build a KeyedWorkItem
}

@Override
public void finishKey() throws Exception {
if (fnRunner != null) {
fnRunner.finishKey();
}
}

@Override
public void finishBundle() throws Exception {
checkState(fnRunner != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public void processElement(Object untypedElem) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ public void processElement(Object elem) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() throws Exception {
groupingTable.flush(receiver);
Expand Down Expand Up @@ -377,10 +380,14 @@ public void processElement(Object elem) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {
sideInputFetcher.persist();
}

@Override
public void finishBundle() throws Exception {
groupingTable.flush(receiver);
sideInputFetcher.persist();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,12 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new PubsubReaderIterator(context.getWorkItem());
return new PubsubReaderIterator();
}

class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
protected PubsubReaderIterator(Windmill.WorkItem work) {
super(work, skipUndecodableElements);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
protected PubsubReaderIterator() {
super(context, skipUndecodableElements);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public void processElement(Object untypedElem) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() throws Exception {
this.receiver = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ public void processTimers() throws Exception {
processTimers(TimerType.SYSTEM, stepContext, windowCoder);
}

@Override
public void finishKey() throws Exception {
if (fnRunner != null) {
fnRunner.finishKey();
}
}

private void processUserTimer(TimerData timer) throws Exception {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
|| fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ public <KeyT> void onTimer(
@Override
public void finishBundle() {
simpleDoFnRunner.finishBundle();
}

@Override
public void finishKey() {
simpleDoFnRunner.finishKey();
sideInputFetcher.persist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
Expand Down Expand Up @@ -157,6 +158,9 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
*/
private @Nullable UnboundedReader<?> activeReader;

private @Nullable WorkExecutor workExecutor;
private boolean finishKeyCalled = false;

public StreamingModeExecutionContext(
CounterFactory counterFactory,
String computationId,
Expand Down Expand Up @@ -240,9 +244,12 @@ public void start(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
Windmill.WorkItemCommitRequest.Builder outputBuilder,
WorkExecutor workExecutor) {
this.key = key;
this.work = work;
this.workExecutor = workExecutor;
this.finishKeyCalled = false;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
StreamingGlobalConfig config = globalConfigHandle.getConfig();
Expand Down Expand Up @@ -270,6 +277,17 @@ public void start(
}
}

public void finishKey() {
checkState(!finishKeyCalled, "finishKey was already called");
checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
try {
workExecutor.finishKey();
} catch (Exception e) {
throw new RuntimeException(e);
}
this.finishKeyCalled = true;
}
Comment on lines +280 to +289
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of finishKey() throws an IllegalStateException if called more than once. However, iterators often call advance() multiple times after exhaustion, which would trigger multiple calls to this method. Making finishKey() idempotent is safer and prevents unexpected crashes in the worker. Additionally, providing a descriptive message when wrapping the exception improves debuggability.

  public void finishKey() {
    if (finishKeyCalled) {
      return;
    }
    checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
    try {
      workExecutor.finishKey();
    } catch (Exception e) {
      throw new RuntimeException("Failed to finish key processing", e);
    }
    this.finishKeyCalled = true;
  }


/**
* Ensure that the processing time is greater than any fired processing time timers. Otherwise, a
* trigger could ignore the timer and orphan the window.
Expand Down Expand Up @@ -451,6 +469,7 @@ public void invalidateCache() {
}

public Map<Long, Pair<Instant, Runnable>> flushState() {
checkState(finishKeyCalled, "finishKey must be called before flushState");
Map<Long, Pair<Instant, Runnable>> callbacks = new HashMap<>();

for (StepContext stepContext : getAllStepContexts()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public void processElement(Object element) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public <KeyT> void onTimer(
@Override
public void finishBundle() {
simpleDoFnRunner.finishBundle();
}

@Override
public void finishKey() {
simpleDoFnRunner.finishKey();
sideInputFetcher.persist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ public void processElement(Object untypedElem) throws Exception {
@Override
public void processTimers() {}

@Override
public void finishKey() throws Exception {}

@Override
public void finishBundle() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,12 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new UngroupedWindmillReaderIterator(context.getWorkItem());
return new UngroupedWindmillReaderIterator();
}

class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
super(work, skipUndecodableElements);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
UngroupedWindmillReaderIterator() {
super(context, skipUndecodableElements);
}

@Override
Expand Down
Loading
Loading