Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -167,7 +166,10 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,

// Ensore both datasets are in the same location.
if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) {
LOG.warn("Direct table copy is only supported if both datasets are in the same location.");
LOG.warn("Direct table copy is only supported if both datasets are in the same location. "
+ "'{}' is '{}' , '{}' is '{}' .",
sourceDatasetId.getDataset(), srcDataset.getLocation(),
destinationDatasetId.getDataset(), destDataset.getLocation());
return SQLWriteResult.unsupported(datasetName);
}

Expand All @@ -177,6 +179,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
return SQLWriteResult.unsupported(datasetName);
}

// Get source table instance
Table srcTable = bigQuery.getTable(sourceTableId);

// Get destination table instance
Table destTable = bigQuery.getTable(destinationTableId);

Expand Down Expand Up @@ -219,7 +224,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,

// Wait for the query to complete.
queryJob = queryJob.waitFor();
JobStatistics.QueryStatistics statistics = queryJob.getStatistics();
JobStatistics.QueryStatistics queryJobStats = queryJob.getStatistics();

// Check for errors
if (queryJob.getStatus().getError() != null) {
Expand All @@ -229,8 +234,11 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
return SQLWriteResult.faiure(datasetName);
}

long numRows = statistics.getNumDmlAffectedRows();
LOG.info("Copied {} records from {}.{}.{} to {}.{}.{}", numRows,
// Number of rows is taken from the job statistics if available.
// If not, we use the number of source table records.
long numRows = queryJobStats != null && queryJobStats.getNumDmlAffectedRows() != null ?
queryJobStats.getNumDmlAffectedRows() : srcTable.getNumRows().longValue();
LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows,
sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable());

Expand All @@ -240,8 +248,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,

/**
* Relax table fields based on the supplied schema
*
* @param schema schema to use when relaxing
* @param table the destionation table to relax
* @param table the destionation table to relax
*/
protected void relaxTableSchema(Schema schema, Table table) {
com.google.cloud.bigquery.Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema);
Expand All @@ -252,9 +261,10 @@ protected void relaxTableSchema(Schema schema, Table table) {

/**
* Create a new BigQuery table based on the supplied schema and table identifier
* @param schema schema to use for this table
* @param tableId itendifier for the new table
* @param sinkConfig Sink configuration used to define this table
*
* @param schema schema to use for this table
* @param tableId itendifier for the new table
* @param sinkConfig Sink configuration used to define this table
* @param newDestinationTable Atomic reference to this new table. Used to delete this table if the execution fails.
*/
protected void createTable(Schema schema,
Expand All @@ -268,7 +278,7 @@ protected void createTable(Schema schema,
tableDefinitionBuilder.setSchema(bqSchema);

// Configure partitioning options
switch(sinkConfig.getPartitioningType()) {
switch (sinkConfig.getPartitioningType()) {
case TIME:
tableDefinitionBuilder.setTimePartitioning(getTimePartitioning(sinkConfig));
break;
Expand Down Expand Up @@ -306,6 +316,7 @@ protected void createTable(Schema schema,

/**
* Try to delete this table while handling exception
*
* @param table the table identified for the table we want to delete.
*/
protected void tryDeleteTable(TableId table) {
Expand Down Expand Up @@ -381,6 +392,7 @@ protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId s

/**
* Build time partitioning configuration based on the BigQuery Sink configuration.
*
* @param config sink configuration to use
* @return Time Partitioning configuration
*/
Expand All @@ -398,6 +410,7 @@ protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) {

/**
* Build range partitioning configuration based on the BigQuery Sink configuration.
*
* @param config sink configuration to use
* @return Range Partitioning configuration
*/
Expand All @@ -416,6 +429,7 @@ protected RangePartitioning getRangePartitioning(BigQuerySinkConfig config) {

/**
* Build range used for partitioning configuration
*
* @param config sink configuration to use
* @return Range configuration
*/
Expand All @@ -433,6 +447,7 @@ protected RangePartitioning.Range getRangePartitioningRange(BigQuerySinkConfig c

/**
* Get the list of fields to use for clustering based on the supplied sink configuration
*
* @param config sink configuration to use
* @return List containing all clustering order fields.
*/
Expand All @@ -446,6 +461,7 @@ List<String> getClusteringOrderFields(BigQuerySinkConfig config) {

/**
* Get the clustering information for a list of clustering fields
*
* @param clusteringFields list of clustering fields to use
* @return Clustering configuration
*/
Expand All @@ -457,6 +473,7 @@ protected Clustering getClustering(List<String> clusteringFields) {

/**
* Get encryption configuration for the supplied sink configuration
*
* @param config sink configuration to use
* @return Encryption configuration
*/
Expand Down