Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.BatchElements;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// beam-playground:
// name: BatchElements
// description: Demonstration of BatchElements transform usage.
// multifile: false
// default_example: false
// context_line: 47
// categories:
// - Core Transforms
// complexity: BASIC
// tags:
// - transforms
// - batch

public class BatchElementsExample {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to use PipelineOptionsFactory.fromArgs(args).withValidation().create() instead of PipelineOptionsFactory.create(). This allows users to pass command-line arguments, such as --runner, to configure the pipeline execution when running the example.

Suggested change
PipelineOptions options = PipelineOptionsFactory.create();
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();


Pipeline pipeline = Pipeline.create(options);

// [START main_section]
// Create input

PCollection<String> inputs =
pipeline.apply(Create.of("apple", "strawberry", "orange", "peach", "cherry", "pear"));

// Create Batch Config
BatchElements.BatchConfig config =
BatchElements.BatchConfig.builder().withMinBatchSize(2).withMaxBatchSize(4).build();
// Batch Elements
PCollection<List<String>> result = inputs.apply(BatchElements.withConfig(config));
// [END main_section]
result.apply(ParDo.of(new LogOutput()));
pipeline.run();
}

static class LogOutput extends DoFn<List<String>, String> {
private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class);

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<String> batch = c.element();

LOG.info("Batch Contents: {}", batch);

for (String element : batch) {
c.output(element);
}
}
Comment on lines +71 to +79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In modern Beam, it is preferred to use parameter injection (e.g., @Element and OutputReceiver) instead of the ProcessContext object. Additionally, the throws Exception clause is unnecessary here as no checked exceptions are thrown in the method body.

    public void processElement(@Element List<String> batch, OutputReceiver<String> receiver) {
      LOG.info("Batch Contents: {}", batch);

      for (String element : batch) {
        receiver.output(element);
      }
    }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms;

import static java.util.Collections.singleton;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -27,6 +29,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;

Expand Down Expand Up @@ -568,7 +571,11 @@ public void processElement(

try (BatchSizeEstimator.Stopwatch sw = estimator.recordTime(targetBatch.size)) {

receiver.outputWithTimestamp(targetBatch.elements, targetWindow.maxTimestamp());
receiver.outputWindowedValue(
targetBatch.elements,
targetWindow.maxTimestamp(),
singleton(targetWindow),
PaneInfo.NO_FIRING);
Comment on lines +574 to +578
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The OutputReceiver interface in the Apache Beam Java SDK does not currently have an outputWindowedValue method. This method is available on DoFn.ProcessContext, but using ProcessContext is generally deprecated in favor of parameter injection. If the intention is to explicitly set the window and pane info (which is necessary if the batch is being emitted for a window other than the one currently in context), you may need to use ProcessContext for this specific DoFn, or ensure that the OutputReceiver interface is updated to support this method. As it stands, this code will likely fail to compile.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

batches.remove(targetWindow);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: "BatchElements"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# BatchElements

BatchElements transform groups individual elements into batches before processing them downstream.
The transform takes a `PCollection<T>` as input and produces a `PCollection<List<T>>`, where each output element is a batch containing multiple input elements.
Batch sizes are chosen dynamically between the configured minimum and maximum values by measuring the execution time of downstream operations.

Batching is performed per window. Each emitted batch belongs to the same window as its input elements.
Comment on lines +18 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include some motivation behind BatchElements? This is a good excerpt:

* <p>This transform is designed to precede operations whose processing cost is of the form:
*
* <pre>
* time = fixed_cost + num_elements * per_element_cost
* </pre>
*
* <p>When the per-element cost is significantly smaller than the fixed cost, batching multiple
* elements together can amortize that fixed cost and improve overall throughput.


## Examples

{{< playground height="700px" >}}
{{< playground_snippet language="java" path="SDK_JAVA_BatchElements" show="main_section" >}}
{{< /playground >}}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ limitations under the License.
<tr><td><a href="/documentation/transforms/java/aggregation/groupbykey">GroupByKey</a></td><td>Takes a keyed collection of elements and produces a collection where each element
consists of a key and all values associated with that key.</td></tr>
<tr><td><a href="/documentation/transforms/java/aggregation/groupintobatches">GroupIntoBatches</a></td><td>Batches values associated with keys into <code>Iterable</code> batches of some size. Each batch contains elements associated with a specific key.</td></tr>
<tr><td><a href="/documentation/transforms/java/aggregation/batchelements">BatchElements</a></td><td>Groups individual elements into batches to amortize fixed processing costs, using dynamically estimated batch sizes.</td></tr>
<tr><td><a href="/documentation/transforms/java/aggregation/hllcount">HllCount</a></td><td>Estimates the number of distinct elements and creates re-aggregatable sketches using the HyperLogLog++ algorithm.</td></tr>
<tr><td><a href="/documentation/transforms/java/aggregation/latest">Latest</a></td><td>Selects the latest element within each aggregation according to the implicit timestamp.</td></tr>
<tr><td><a href="/documentation/transforms/java/aggregation/max">Max</a></td><td>Outputs the maximum element within each aggregation.</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@
<li><a href="/documentation/transforms/java/aggregation/distinct/">Distinct</a></li>
<li><a href="/documentation/transforms/java/aggregation/groupbykey/">GroupByKey</a></li>
<li><a href="/documentation/transforms/java/aggregation/groupintobatches/">GroupIntoBatches</a></li>
<li><a href="/documentation/transforms/java/aggregation/batchelements/">BatchElements</a></li>
<li><a href="/documentation/transforms/java/aggregation/hllcount/">HllCount</a></li>
<li><a href="/documentation/transforms/java/aggregation/latest/">Latest</a></li>
<li><a href="/documentation/transforms/java/aggregation/max/">Max</a></li>
Expand Down
Loading