Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<avro.version>1.8.2</avro.version>
<bigquery.connector.hadoop2.version>hadoop2-1.0.0</bigquery.connector.hadoop2.version>
<commons.codec.version>1.4</commons.codec.version>
<cdap.version>6.7.0</cdap.version>
<cdap.version>6.8.0-SNAPSHOT</cdap.version>
<cdap.plugin.version>2.10.0-SNAPSHOT</cdap.plugin.version>
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
Expand All @@ -46,9 +48,13 @@
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
Expand All @@ -61,6 +67,7 @@

import java.time.DateTimeException;
import java.time.LocalDate;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -76,6 +83,7 @@
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = BigQueryConnector.NAME)})
public final class BigQuerySource extends BatchSource<LongWritable, GenericData.Record, StructuredRecord> {
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySource.class);
private static final Gson GSON = new Gson();
public static final String NAME = "BigQueryTable";
private BigQuerySourceConfig config;
private Schema outputSchema;
Expand Down Expand Up @@ -165,7 +173,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
// We call emitLineage before since it creates the dataset with schema.
Type sourceTableType = config.getSourceTableType();
emitLineage(context, configuredSchema, sourceTableType, config.getTable());
setInputFormat(context);
setInputFormat(context, configuredSchema);
}

@Override
Expand Down Expand Up @@ -335,8 +343,31 @@ private void validatePartitionProperties(FailureCollector collector) {
}
}

private void setInputFormat(BatchSourceContext context) {
private void setInputFormat(BatchSourceContext context,
Schema configuredSchema) {
// Set input for Spark
context.setInput(Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration)));

// Add output for SQL Engine Direct read
ImmutableMap.Builder<String, String> arguments = new ImmutableMap.Builder<>();

if (configuredSchema == null) {
LOG.debug("BigQuery SQL Engine Input was not initialized. Schema was empty.");
return;
}

List<String> fieldNames = configuredSchema.getFields().stream().map(f -> f.getName()).collect(Collectors.toList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had to make a change on the BQ sink to only configure the Pushdown sink when a schema is configured in #1120

Can you add the same here? Essentially, if configuredSchema is null, don't initialize the pushdown sink. Realistically, I don't see how pushdown execution would even work if the schema is not known at runtime.


arguments
.put(BigQueryReadDataset.SQL_INPUT_CONFIG, GSON.toJson(config))
.put(BigQueryReadDataset.SQL_INPUT_SCHEMA, GSON.toJson(configuredSchema))
.put(BigQueryReadDataset.SQL_INPUT_FIELDS, GSON.toJson(fieldNames));

Input sqlEngineInput = new SQLEngineInput(config.referenceName,
context.getStageName(),
BigQuerySQLEngine.class.getName(),
arguments.build());
context.setInput(sqlEngineInput);
}

private void emitLineage(BatchSourceContext context, Schema schema, Type sourceTableType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
* in order to create input splits.
*/
public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat<LongWritable, GenericData.Record> {
private static final String DEFAULT_COLUMN_NAME = "_PARTITIONTIME";

private InputFormat<LongWritable, GenericData.Record> delegateInputFormat =
new AvroBigQueryInputFormat();
Expand Down Expand Up @@ -160,8 +159,8 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
StringBuilder condition = new StringBuilder();

if (timePartitioning != null) {
String timePartitionCondition = generateTimePartitionCondition(tableDefinition, timePartitioning,
partitionFromDate, partitionToDate);
String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
partitionToDate);
condition.append(timePartitionCondition);
}

Expand Down Expand Up @@ -289,33 +288,4 @@ private static JobReference getJobReference(Configuration conf, BigQueryHelper b
}
return new JobReference().setProjectId(projectId).setJobId(savedJobId).setLocation(location);
}

private String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
TimePartitioning timePartitioning, String partitionFromDate,
String partitionToDate) {
StringBuilder timePartitionCondition = new StringBuilder();
String columnName = timePartitioning.getField() != null ? timePartitioning.getField() : DEFAULT_COLUMN_NAME;

LegacySQLTypeName columnType = null;
if (!DEFAULT_COLUMN_NAME.equals(columnName)) {
columnType = tableDefinition.getSchema().getFields().get(columnName).getType();
}

String columnNameTS = columnName;
if (!LegacySQLTypeName.TIMESTAMP.equals(columnType)) {
columnNameTS = "TIMESTAMP(`" + columnNameTS + "`)";
}
if (partitionFromDate != null) {
timePartitionCondition.append(columnNameTS).append(" >= ").append("TIMESTAMP(\"")
.append(partitionFromDate).append("\")");
}
if (partitionFromDate != null && partitionToDate != null) {
timePartitionCondition.append(" and ");
}
if (partitionToDate != null) {
timePartitionCondition.append(columnNameTS).append(" < ").append("TIMESTAMP(\"")
.append(partitionToDate).append("\")");
}
return timePartitionCondition.toString();
}
}
Loading