4747import com .google .common .reflect .TypeToken ;
4848import com .google .gson .Gson ;
4949import io .cdap .cdap .api .data .schema .Schema ;
50+ import io .cdap .cdap .api .metrics .Metrics ;
5051import io .cdap .cdap .etl .api .engine .sql .dataset .SQLDataset ;
5152import io .cdap .cdap .etl .api .engine .sql .request .SQLReadRequest ;
5253import io .cdap .cdap .etl .api .engine .sql .request .SQLReadResult ;
@@ -85,26 +86,30 @@ public class BigQueryReadDataset implements SQLDataset, BigQuerySQLDataset {
8586 private final String jobId ;
8687 private Schema schema ;
8788 private Long numRows ;
89+ private Metrics metrics ;
8890
8991 private BigQueryReadDataset (String datasetName ,
9092 BigQuerySQLEngineConfig sqlEngineConfig ,
9193 BigQuery bigQuery ,
9294 SQLReadRequest readRequest ,
9395 TableId destinationTableId ,
94- String jobId ) {
96+ String jobId ,
97+ Metrics metrics ) {
9598 this .datasetName = datasetName ;
9699 this .sqlEngineConfig = sqlEngineConfig ;
97100 this .bigQuery = bigQuery ;
98101 this .readRequest = readRequest ;
99102 this .destinationTableId = destinationTableId ;
100103 this .jobId = jobId ;
104+ this .metrics = metrics ;
101105 }
102106
103107 public static BigQueryReadDataset getInstance (String datasetName ,
104108 BigQuerySQLEngineConfig sqlEngineConfig ,
105109 BigQuery bigQuery ,
106110 SQLReadRequest readRequest ,
107- TableId destinationTableId ) {
111+ TableId destinationTableId ,
112+ Metrics metrics ) {
108113 // Get new Job ID for this push operation
109114 String jobId = BigQuerySQLEngineUtils .newIdentifier ();
110115
@@ -113,7 +118,8 @@ public static BigQueryReadDataset getInstance(String datasetName,
113118 bigQuery ,
114119 readRequest ,
115120 destinationTableId ,
116- jobId );
121+ jobId ,
122+ metrics );
117123 }
118124
119125 public SQLReadResult read () {
@@ -216,7 +222,7 @@ private SQLReadResult readInternal(SQLReadRequest readRequest,
216222
217223 // Check for errors
218224 if (queryJob .getStatus ().getError () != null ) {
219- BigQuerySQLEngineUtils .logJobMetrics (queryJob );
225+ BigQuerySQLEngineUtils .logJobMetrics (queryJob , metrics );
220226 LOG .error ("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}" ,
221227 jobId , sqlEngineConfig .getProject (), sqlEngineConfig .getDatasetProject (),
222228 queryJob .getStatus ().getError ().toString ());
@@ -230,7 +236,7 @@ private SQLReadResult readInternal(SQLReadRequest readRequest,
230236 LOG .info ("Executed read operation for {} records from {}.{}.{} into {}.{}.{}" , numRows ,
231237 sourceTableId .getProject (), sourceTableId .getDataset (), sourceTableId .getTable (),
232238 sourceTableId .getProject (), sourceTableId .getDataset (), sourceTableId .getTable ());
233- BigQuerySQLEngineUtils .logJobMetrics (queryJob );
239+ BigQuerySQLEngineUtils .logJobMetrics (queryJob , metrics );
234240
235241 return SQLReadResult .success (datasetName , this );
236242 }
0 commit comments