Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> inputRows = input.getSinglePCollection();

BigQueryIO.Write<Row> write = createStorageWriteApiTransform(inputRows.getSchema());
int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams();
Comment thread
stankiewicz marked this conversation as resolved.

if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
Boolean autoSharding = configuration.getAutoSharding();
int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams();

boolean useAtLeastOnceSemantics =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -197,12 +197,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
: Duration.standardSeconds(triggeringFrequency));
}
// set num streams if specified, otherwise default to autoSharding
if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
} else if (autoSharding == null || autoSharding) {
if (numStreams == 0 && (autoSharding == null || autoSharding)) {
write = write.withAutoSharding();
}
}
if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
}

Schema inputSchema = inputRows.getSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public void validate() {

Boolean autoSharding = getAutoSharding();
Integer numStreams = getNumStreams();
if (numStreams != null) {
Comment thread
stankiewicz marked this conversation as resolved.
checkArgument(
numStreams >= 0,
invalidConfigMessage + "numStreams must be non-negative, but was: %s",
numStreams);
}
Comment on lines +104 to +109
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If numStreams is explicitly set to 0 but autoSharding is not enabled (or is false), the configuration is accepted but numStreams is silently ignored in BigQueryStorageWriteApiSchemaTransformProvider (since it only applies numStreams if it is > 0). To prevent confusing behavior where a user configures 0 streams expecting that to be applied, we should validate that numStreams can only be 0 if autoSharding is enabled.

    if (numStreams != null) {
      checkArgument(
          numStreams >= 0,
          invalidConfigMessage + "numStreams must be non-negative, but was: %s",
          numStreams);
      if (numStreams == 0) {
        checkArgument(
            autoSharding != null && autoSharding,
            invalidConfigMessage + "numStreams can only be 0 if autoSharding is enabled.");
      }
    }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion i looked into the existing BigQueryIO semantics and i think this validation would be broader than the scope of this PR.

numStorageWriteApiStreams currently uses 0 as the default/unset sentinel in BigQueryIO, and withNumStorageWriteApiStreams(0) is documented as valid behavior: for streaming it lets the runner determine sharding, and for batch it avoids inserting a redistribute and keeps the existing pipeline parallelism.

Because autoSharding is only applicable to unbounded input, requiring numStreams == 0 to also have autoSharding == true would reject or complicate valid/default batch behavior and would be a behavior change beyond this PR’s narrow goal of allowing positive fixed stream counts for bounded Storage Write API writes.

Given that Id prefer not to add this validation unless a reviewer wants to intentionally change the accepted schema transform semantics for explicit numStreams: 0.

if (autoSharding != null && autoSharding && numStreams != null) {
checkArgument(
numStreams == 0,
Expand Down Expand Up @@ -152,8 +158,7 @@ public static Builder builder() {
public abstract Boolean getAutoSharding();

@SchemaFieldDescription(
"Specifies the number of write streams that the Storage API sink will use. "
+ "This parameter is only applicable when writing unbounded data.")
"Specifies the number of write streams that the Storage API sink will use.")
@Nullable
public abstract Integer getNumStreams();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ public void testInvalidConfig() {
Arrays.asList(
BigQueryWriteConfiguration.builder()
.setTable("project:dataset.table")
.setCreateDisposition("INVALID_DISPOSITION"));
.setCreateDisposition("INVALID_DISPOSITION"),
BigQueryWriteConfiguration.builder()
.setTable("project:dataset.table")
.setNumStreams(-1));

for (BigQueryWriteConfiguration.Builder config : invalidConfigs) {
assertThrows(
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2155,8 +2155,7 @@ def __init__(
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
applicable to unbounded input.
num_storage_api_streams: Specifies the number of write streams that the
Storage API sink will use. This parameter is only applicable when
writing unbounded data.
Storage API sink will use.
ignore_unknown_columns: Accept rows that contain values that do not match
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. This option is only valid for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ pipeline uses. You can set it explicitly on the transform via
[`withNumStorageWriteApiStreams`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-)
or provide the `numStorageWriteApiStreams` option to the pipeline as defined in
[`BigQueryOptions`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html).
Please note this is only supported for streaming pipelines.
Fixed stream counts can be used with both batch and streaming pipelines.

Triggering frequency determines how soon the data is visible for querying in
BigQuery. You can explicitly set it via
Expand Down
Loading