Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions docs/BigQueryPushdownEngine-sqlengine.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,6 @@ completed. This API can be used if the execution environment for this environmen
Note that this API has an on-demand price model. See the [Pricing](https://cloud.google.com/bigquery/pricing#storage-api)
page for details related to pricing.

**Attempt direct copy to BigQuery sinks**: Performance can be greatly improved if the records from stages that are executed using
BigQuery ELT Transformation Pushdown are copied directly into a configured table in a compatible BigQuery Sink. This eliminates
the need to read records into the pipeline as no further processing is needed within the pipeline.
To ensure this BigQuery Sink can take advantage of the performance improvements provided by this feature,
the following requirements must be met:
1. The service account configured for BigQuery ELT Transformation Pushdown has permissions to create and update tables in the dataset used by the BigQuery Sink.
2. The datasets used for BigQuery ELT Transformation Pushdown and BigQuery Sink must be stored in the same **location**.
3. The **operation** is either `insert` (With **Truncate Table** disabled), `update`or `upsert`

Note: If the direct copy operation does not succeed, the pipeline will proceed with the standard workflow in order to ensure completion.

**Service Account** - service account key used for authorization

* **File Path**: Path on the local file system of the service account key used for
Expand Down Expand Up @@ -121,6 +110,21 @@ corresponding BigQuery data type for each CDAP type, for updates and upserts.
If any of the stages involved in a Join operation contains an unsupported type,
this Join operation will be executed in Spark.

Writing to BigQuery Sinks
-------------------------

Performance can be greatly improved if the records from stages that are executed using BigQuery Transformation
Pushdown are copied directly into a configured table in a compatible BigQuery Sink.
This eliminates the need to read records into the pipeline as no further processing is needed within the pipeline.

To ensure this BigQuery Sink can take advantage of the performance improvements provided by this feature,
the following requirements must be met:
1. The service account configured for BigQuery ELT Transformation Pushdown has permissions to create and update tables in the dataset used by the BigQuery Sink.
2. The datasets used for BigQuery ELT Transformation Pushdown and BigQuery Sink must be stored in the same **location**.
3. The **operation** is either `insert` (With **Truncate Table** disabled), `update`or `upsert`

Note: If the direct copy operation does not succeed, the pipeline will proceed with the standard workflow in order to ensure completion.

Trouble Shooting
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,6 @@ public Set<PullCapability> getPullCapabilities() {
public SQLWriteResult write(SQLWriteRequest writeRequest) {
String datasetName = writeRequest.getDatasetName();

// Check if direct sink write is enabled. If not, skip.
if (!sqlEngineConfig.shouldUseDirectSinkWrite()) {
return SQLWriteResult.unsupported(datasetName);
}

// Check if this output matches the expected engine. If it doesn't, skip execution for this write operation.;
if (!BigQuerySQLEngine.class.getName().equals(writeRequest.getOutput().getSqlEngineClassName())) {
LOG.debug("Got output for another SQL engine {}, skipping", writeRequest.getOutput().getSqlEngineClassName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig {
public static final String NAME_INCLUDED_STAGES = "includedStages";
public static final String NAME_EXCLUDED_STAGES = "excludedStages";
public static final String NAME_USE_STORAGE_READ_API = "useStorageReadAPI";
public static final String NAME_DIRECT_SINK_WRITE = "useDirectSinkWrite";

// Job priority options
public static final String PRIORITY_BATCH = "batch";
Expand Down Expand Up @@ -97,15 +96,6 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig {
"This requires Scala version 2.12 to be installed in the execution environment.")
private Boolean useStorageReadAPI;

@Name(NAME_DIRECT_SINK_WRITE)
@Macro
@Nullable
@Description("If enabled, the SQL engine will try to write output directly to BigQuery sinks using a BigQuery " +
"job. This requires the service account used by the BigQuery ELT Transformation Pushdown to have permissions " +
"in both datasets, and both datasets must be located in the same location. If this operation does not " +
"succeed, the standard sink workflow will continue to execute.")
private Boolean useDirectSinkWrite;

@Name(NAME_INCLUDED_STAGES)
@Macro
@Nullable
Expand Down Expand Up @@ -165,10 +155,6 @@ public Boolean shouldUseStorageReadAPI() {
return useStorageReadAPI != null ? useStorageReadAPI : false;
}

public Boolean shouldUseDirectSinkWrite() {
return useDirectSinkWrite != null ? useDirectSinkWrite : false;
}

public QueryJobConfiguration.Priority getJobPriority() {
String priority = jobPriority != null ? jobPriority : "batch";
return QueryJobConfiguration.Priority.valueOf(priority.toUpperCase());
Expand Down
16 changes: 0 additions & 16 deletions widgets/BigQueryPushdownEngine-sqlengine.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,6 @@
},
"default": "false"
}
},
{
"widget-type": "toggle",
"label": "Attempt direct copy to BigQuery sinks",
"name": "useDirectSinkWrite",
"widget-attributes": {
"on": {
"value": "true",
"label": "YES"
},
"off": {
"value": "false",
"label": "NO"
},
"default": "false"
}
}
]
}
Expand Down