5151import java .lang .reflect .Type ;
5252import java .util .ArrayList ;
5353import java .util .Arrays ;
54- import java .util .Collections ;
5554import java .util .List ;
5655import java .util .Map ;
5756import java .util .Objects ;
@@ -167,7 +166,10 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
167166
168167 // Ensore both datasets are in the same location.
169168 if (!Objects .equals (srcDataset .getLocation (), destDataset .getLocation ())) {
170- LOG .warn ("Direct table copy is only supported if both datasets are in the same location." );
169+ LOG .warn ("Direct table copy is only supported if both datasets are in the same location. "
170+ + "'{}' is '{}' , '{}' is '{}' ." ,
171+ sourceDatasetId .getDataset (), srcDataset .getLocation (),
172+ destinationDatasetId .getDataset (), destDataset .getLocation ());
171173 return SQLWriteResult .unsupported (datasetName );
172174 }
173175
@@ -177,6 +179,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
177179 return SQLWriteResult .unsupported (datasetName );
178180 }
179181
182+ // Get source table instance
183+ Table srcTable = bigQuery .getTable (sourceTableId );
184+
180185 // Get destination table instance
181186 Table destTable = bigQuery .getTable (destinationTableId );
182187
@@ -219,7 +224,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
219224
220225 // Wait for the query to complete.
221226 queryJob = queryJob .waitFor ();
222- JobStatistics .QueryStatistics statistics = queryJob .getStatistics ();
227+ JobStatistics .QueryStatistics queryJobStats = queryJob .getStatistics ();
223228
224229 // Check for errors
225230 if (queryJob .getStatus ().getError () != null ) {
@@ -229,8 +234,11 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
229234 return SQLWriteResult .faiure (datasetName );
230235 }
231236
232- long numRows = statistics .getNumDmlAffectedRows ();
233- LOG .info ("Copied {} records from {}.{}.{} to {}.{}.{}" , numRows ,
237+ // Number of rows is taken from the job statistics if available.
238+ // If not, we use the number of source table records.
239+ long numRows = queryJobStats != null && queryJobStats .getNumDmlAffectedRows () != null ?
240+ queryJobStats .getNumDmlAffectedRows () : srcTable .getNumRows ().longValue ();
241+ LOG .info ("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}" , numRows ,
234242 sourceTableId .getProject (), sourceTableId .getDataset (), sourceTableId .getTable (),
235243 destinationTableId .getProject (), destinationTableId .getDataset (), destinationTableId .getTable ());
236244
@@ -240,8 +248,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
240248
241249 /**
242250 * Relax table fields based on the supplied schema
251+ *
243252 * @param schema schema to use when relaxing
244- * @param table the destionation table to relax
253+ * @param table the destionation table to relax
245254 */
246255 protected void relaxTableSchema (Schema schema , Table table ) {
247256 com .google .cloud .bigquery .Schema bqSchema = BigQuerySinkUtils .convertCdapSchemaToBigQuerySchema (schema );
@@ -252,9 +261,10 @@ protected void relaxTableSchema(Schema schema, Table table) {
252261
253262 /**
254263 * Create a new BigQuery table based on the supplied schema and table identifier
255- * @param schema schema to use for this table
256- * @param tableId itendifier for the new table
257- * @param sinkConfig Sink configuration used to define this table
264+ *
265+ * @param schema schema to use for this table
266+ * @param tableId itendifier for the new table
267+ * @param sinkConfig Sink configuration used to define this table
258268 * @param newDestinationTable Atomic reference to this new table. Used to delete this table if the execution fails.
259269 */
260270 protected void createTable (Schema schema ,
@@ -268,7 +278,7 @@ protected void createTable(Schema schema,
268278 tableDefinitionBuilder .setSchema (bqSchema );
269279
270280 // Configure partitioning options
271- switch (sinkConfig .getPartitioningType ()) {
281+ switch (sinkConfig .getPartitioningType ()) {
272282 case TIME :
273283 tableDefinitionBuilder .setTimePartitioning (getTimePartitioning (sinkConfig ));
274284 break ;
@@ -306,6 +316,7 @@ protected void createTable(Schema schema,
306316
307317 /**
308318 * Try to delete this table while handling exception
319+ *
309320 * @param table the table identified for the table we want to delete.
310321 */
311322 protected void tryDeleteTable (TableId table ) {
@@ -381,6 +392,7 @@ protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId s
381392
382393 /**
383394 * Build time partitioning configuration based on the BigQuery Sink configuration.
395+ *
384396 * @param config sink configuration to use
385397 * @return Time Partitioning configuration
386398 */
@@ -398,6 +410,7 @@ protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) {
398410
399411 /**
400412 * Build range partitioning configuration based on the BigQuery Sink configuration.
413+ *
401414 * @param config sink configuration to use
402415 * @return Range Partitioning configuration
403416 */
@@ -416,6 +429,7 @@ protected RangePartitioning getRangePartitioning(BigQuerySinkConfig config) {
416429
417430 /**
418431 * Build range used for partitioning configuration
432+ *
419433 * @param config sink configuration to use
420434 * @return Range configuration
421435 */
@@ -433,6 +447,7 @@ protected RangePartitioning.Range getRangePartitioningRange(BigQuerySinkConfig c
433447
434448 /**
435449 * Get the list of fields to use for clustering based on the supplied sink configuration
450+ *
436451 * @param config sink configuration to use
437452 * @return List containing all clustering order fields.
438453 */
@@ -446,6 +461,7 @@ List<String> getClusteringOrderFields(BigQuerySinkConfig config) {
446461
447462 /**
448463 * Get the clustering information for a list of clustering fields
464+ *
449465 * @param clusteringFields list of clustering fields to use
450466 * @return Clustering configuration
451467 */
@@ -457,6 +473,7 @@ protected Clustering getClustering(List<String> clusteringFields) {
457473
458474 /**
459475 * Get encryption configuration for the supplied sink configuration
476+ *
460477 * @param config sink configuration to use
461478 * @return Encryption configuration
462479 */
0 commit comments