Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 6,
"modification": 7,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
2 changes: 0 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,6 @@ task validatesRunnerStreaming {
description "Validates Dataflow runner forcing streaming mode"
dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [
name: 'validatesRunnerLegacyWorkerTestStreaming',
// Streaming appliance currently fails bundle finalizer tests.
excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
]))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,14 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) ->
checkNotNull(computationStateCache)
.get(processingContext.computationId())
.ifPresent(
computationState -> {
memoryMonitor.waitForResources("GetWork");
streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds);
streamingWorkScheduler.scheduleWork(
computationState,
workItem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,16 @@ private void streamingEngineDispatchLoop(
drainMode,
workItem,
serializedWorkItemSize,
appliedFinalizeIds,
getWorkStreamLatencies) ->
computationStateFetcher
.apply(computationId)
.ifPresent(
computationState -> {
waitForResources.run();
if (!appliedFinalizeIds.isEmpty()) {
streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds);
}
streamingWorkScheduler.scheduleWork(
computationState,
workItem,
Expand Down Expand Up @@ -214,6 +218,12 @@ private void applianceDispatchLoop(Supplier<Windmill.GetWorkResponse> getWorkFn)
sleepUninterruptibly(backoff, TimeUnit.MILLISECONDS);
backoff = Math.min(1000, backoff * 2);
} while (isRunning.get());
ImmutableList<Long> appliedFinalizeIds =
ImmutableList.copyOf(
Preconditions.checkNotNull(workResponse).getAppliedFinalizeIdsList());
if (!appliedFinalizeIds.isEmpty()) {
streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds);
}
for (Windmill.ComputationWorkItems computationWork :
Preconditions.checkNotNull(workResponse).getWorkList()) {
String computationId = computationWork.getComputationId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ List<AssembledWorkItem> append(Windmill.StreamingGetWorkResponseChunk chunk) {
private Optional<AssembledWorkItem> flushToWorkItem() {
try {
workItemBuilder.mergeFrom(data);
workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds);
return Optional.of(
AssembledWorkItem.create(
workItemBuilder.build(),
Preconditions.checkNotNull(metadata),
workTimingInfosTracker.getLatencyAttributions(),
ImmutableList.copyOf(appliedFinalizeIds),
bufferedSize));
} catch (IOException e) {
LOG.error("Failed to parse work item from stream: ", e);
Expand Down Expand Up @@ -149,9 +149,10 @@ private static AssembledWorkItem create(
WorkItem workItem,
ComputationMetadata computationMetadata,
ImmutableList<LatencyAttribution> latencyAttributions,
ImmutableList<Long> appliedFinalizeIds,
long size) {
return new AutoValue_GetWorkResponseChunkAssembler_AssembledWorkItem(
workItem, computationMetadata, latencyAttributions, size);
workItem, computationMetadata, latencyAttributions, appliedFinalizeIds, size);
}

abstract WorkItem workItem();
Expand All @@ -160,6 +161,8 @@ private static AssembledWorkItem create(

abstract ImmutableList<LatencyAttribution> latencyAttributions();

abstract ImmutableList<Long> appliedFinalizeIds();

abstract long bufferedSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
createWatermarks(workItem, metadata),
createProcessingContext(metadata.computationId()),
metadata.drainMode(),
assembledWorkItem.appliedFinalizeIds(),
assembledWorkItem.latencyAttributions());
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
GetWorkBudget extension = budgetTracker.computeBudgetExtension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
assembledWorkItem.computationMetadata().drainMode(),
assembledWorkItem.workItem(),
assembledWorkItem.bufferedSize(),
assembledWorkItem.appliedFinalizeIds(),
assembledWorkItem.latencyAttributions());

// Record the fact that there are now fewer outstanding messages and bytes on the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ void receiveWork(
boolean drainMode,
Windmill.WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<Long> appliedFinalizeIds,
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public interface WorkItemScheduler {
* @param watermarks processing watermarks for the workItem.
* @param processingContext for processing the workItem.
* @param drainMode is job is draining.
* @param appliedFinalizeIds Any applied finalize ids that should have their callbacks run.
* @param getWorkStreamLatencies Latencies per processing stage for the WorkItem for reporting
* back to Streaming Engine backend.
*/
Expand All @@ -45,5 +46,6 @@ void scheduleWork(
Watermarks watermarks,
Work.ProcessingContext processingContext,
boolean drainMode,
ImmutableList<Long> appliedFinalizeIds,
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ public void scheduleWork(
work -> processWork(computationState, work, getWorkStreamLatencies)));
}

/** Adds any applied finalize ids to the commit finalizer to have their callbacks executed. */
public void queueAppliedFinalizeIds(ImmutableList<Long> appliedFinalizeIds) {
commitFinalizer.finalizeCommits(appliedFinalizeIds);
}

/**
* Executes the user DoFns processing {@link Work} then queues the {@link Commit}(s) to be sent to
* backing persistent store to mark that the {@link Work} has finished processing. May retry
Expand Down Expand Up @@ -246,7 +251,6 @@ private void processWork(ComputationState computationState, Work work) {
// Before any processing starts, call any pending OnCommit callbacks. Nothing that requires
// cleanup should be done before this, since we might exit early here.
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList());
if (workItem.getSourceState().getOnlyFinalize()) {
Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem);
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep
computationWork.getDrainMode(),
workItem,
workItem.getSerializedSize(),
ImmutableList.of(),
ImmutableList.of(
LatencyAttribution.newBuilder()
.setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private static WorkItemScheduler noOpProcessWorkItemFn() {
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> {};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class WindmillStreamSenderTest {
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> {};
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
private ManagedChannel inProcessChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class GrpcDirectGetWorkStreamTest {
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> {};
private static final Windmill.JobHeader TEST_JOB_HEADER =
Windmill.JobHeader.newBuilder()
Expand Down Expand Up @@ -285,6 +286,7 @@ public void testConsumedWorkItem_computesAndSendsCorrectExtension() throws Inter
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> {
scheduledWorkItems.add(work);
});
Expand Down Expand Up @@ -334,6 +336,7 @@ public void testConsumedWorkItem_doesNotSendExtensionIfOutstandingBudgetHigh()
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> scheduledWorkItems.add(work));
Windmill.WorkItem workItem =
Windmill.WorkItem.newBuilder()
Expand Down Expand Up @@ -372,6 +375,7 @@ public void testConsumedWorkItems() throws InterruptedException {
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> {
scheduledWorkItems.add(work);
});
Expand Down Expand Up @@ -416,6 +420,7 @@ public void testConsumedWorkItems_itemsSplitAcrossResponses() throws Interrupted
watermarks,
processingContext,
drainMode,
appliedFinalizeIds,
getWorkStreamLatencies) -> {
scheduledWorkItems.add(work);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public void onCompleted() {
boolean drainMode,
WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<Long> appliedFinalizeIds,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) -> {
latch.countDown();
assertEquals(inputDataWatermark, new Instant(18));
Expand Down Expand Up @@ -474,6 +475,7 @@ public void onCompleted() {
boolean drainMode,
WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<Long> appliedFinalizeIds,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) -> {
assertEquals(inputDataWatermark, new Instant(18));
assertEquals(synchronizedProcessingTime, new Instant(17));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,7 @@ message WorkItem {
// present, this field includes metadata associated with any hot key.
optional HotKeyInfo hot_key_info = 11;

repeated int64 applied_finalize_ids = 16;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserve it, just in case something is already depending on it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I also triggered a run of the post submits, which should test bundle finalizers on appliance and engine. Let's see how they do.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is best practice for protos anyway, field numbers should never be re-used even if the field is deprecated


reserved 12, 13, 14, 15;
reserved 12, 13, 14, 15, 16;
}

message ComputationWorkItems {
Expand Down Expand Up @@ -479,6 +477,8 @@ message GetWorkRequest {

message GetWorkResponse {
repeated ComputationWorkItems work = 1;
// Finalize ids associated with successfully applied work from this worker
repeated int64 applied_finalize_ids = 2 [packed = true];
}

// GetData
Expand Down
Loading