Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ private enum BigQueryJobType { QUERY, COPY, COPY_SNAPSHOT };
public static final String SQL_INPUT_CONFIG = "config";
public static final String SQL_INPUT_FIELDS = "fields";
public static final String SQL_INPUT_SCHEMA = "schema";
public static final String BQ_COPY_SNAPSHOT_OP_TYPE = "SNAPSHOT";
private static final java.lang.reflect.Type LIST_OF_STRINGS_TYPE = new TypeToken<ArrayList<String>>() { }.getType();
private static final String BQ_PUSHDOWN_OPERATION_TAG = "read";

Expand Down Expand Up @@ -214,24 +213,9 @@ private SQLReadResult readInternal(SQLReadRequest readRequest,
tableTTL = Instant.now().toEpochMilli() + ttlMillis;
}

// no Filter + no view + no material + no external
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
Type type = tableDefinition.getType();
if (!(type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL)
&& sourceConfig.getFilter() == null) {
// TRY SNAPSHOT
JobConfiguration jobConfiguration = getBQSnapshotJobConf(sourceTableId, destinationTableId);
SQLReadResult snapshotResult = executeBigQueryJob(jobConfiguration, sourceTable, sourceTableId,
BigQueryJobType.COPY_SNAPSHOT, jobLocation);
if (snapshotResult.isSuccessful()) {
BigQuerySQLEngineUtils.updateTableExpiration(bigQuery, destinationTableId, tableTTL);
return snapshotResult;
}
LOG.warn("Big Query Snapshot process used for direct BigQuery read failed. Using fallback table copy strategy.");
}

JobConfiguration queryConfig = getBQQueryJobConfiguration(sourceTable, sourceTableId,
fields,
// Create configuration for table copy job
JobConfiguration queryConfig = getBQQueryJobConfiguration(sourceTable,
sourceTableId,
sourceConfig.getFilter(),
sourceConfig.getPartitionFrom(),
sourceConfig.getPartitionTo(),
Expand Down Expand Up @@ -284,19 +268,19 @@ private SQLReadResult executeBigQueryJob(JobConfiguration jobConfiguration,
return SQLReadResult.success(datasetName, this);
}

JobConfiguration getBQQueryJobConfiguration(Table sourceTable, TableId sourceTableId,
List<String> fields,
String filter,
String partitionFromDate,
String partitionToDate,
Long tableTTL) {
private JobConfiguration getBQQueryJobConfiguration(Table sourceTable,
TableId sourceTableId,
String filter,
String partitionFromDate,
String partitionToDate,
Long tableTTL) {

BigQuerySQLEngineUtils.createEmptyTableWithSourceConfig(bigQuery, destinationTableId.getProject(),
destinationTableId.getDataset(), destinationTableId.getTable(),
sourceTable, tableTTL);

String query = String.format("SELECT %s FROM `%s.%s.%s`",
String.join(",", fields),
// Select all fields from source table into destination table
String query = String.format("SELECT * FROM `%s.%s.%s`",
sourceTableId.getProject(),
sourceTableId.getDataset(),
sourceTableId.getTable());
Expand Down Expand Up @@ -389,16 +373,6 @@ QueryJobConfiguration.Builder getQueryBuilder(Table sourceTable, TableId sourceT
.setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG));
}

private JobConfiguration getBQSnapshotJobConf(TableId sourceTable, TableId destinationTable) {
CopyJobConfiguration copyJobConfiguration =
CopyJobConfiguration.newBuilder(destinationTable, sourceTable)
.setOperationType(BQ_COPY_SNAPSHOT_OP_TYPE)
.setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG))
.build();

return copyJobConfiguration;
}

/**
* Try to delete this table while handling exception
*
Expand Down