Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.RuntimeContext;
import io.cdap.cdap.api.SQLEngineContext;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine;
Expand Down Expand Up @@ -115,8 +117,8 @@ public class BigQuerySQLEngine
private String dataset;
private String bucket;
private String runId;
private Map<String, String> tableNames;
private Map<String, BigQuerySQLDataset> datasets;
private Metrics metrics;

@SuppressWarnings("unused")
public BigQuerySQLEngine(BigQuerySQLEngineConfig sqlEngineConfig) {
Expand All @@ -132,14 +134,13 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
}

@Override
public void prepareRun(RuntimeContext context) throws Exception {
public void prepareRun(SQLEngineContext context) throws Exception {
super.prepareRun(context);

// Validate configuration and throw exception if the supplied configuration is invalid.
sqlEngineConfig.validate();

runId = BigQuerySQLEngineUtils.newIdentifier();
tableNames = new HashMap<>();
datasets = new HashMap<>();

String serviceAccount = sqlEngineConfig.getServiceAccount();
Expand Down Expand Up @@ -171,10 +172,13 @@ public void prepareRun(RuntimeContext context) throws Exception {

// Configure credentials for the source
BigQuerySourceUtils.configureServiceAccount(configuration, sqlEngineConfig.connection);

// Get metrics instance
metrics = context.getMetrics();
}

@Override
public void onRunFinish(boolean succeeded, RuntimeContext context) {
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
super.onRunFinish(succeeded, context);

String gcsPath;
Expand Down Expand Up @@ -358,7 +362,8 @@ public SQLWriteResult write(SQLWriteRequest writeRequest) {
sqlEngineConfig,
bigQuery,
writeRequest,
sourceTableId);
sourceTableId,
metrics);
return bigQueryWrite.write();
}

Expand Down Expand Up @@ -529,7 +534,8 @@ private BigQuerySelectDataset executeSelect(String datasetName,
table,
jobId,
jobType,
query
query,
metrics
).execute();

datasets.put(datasetName, selectDataset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
Expand All @@ -19,6 +20,8 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

/**
Expand All @@ -38,6 +41,7 @@ public class BigQuerySelectDataset implements SQLDataset, BigQuerySQLDataset {
private final String jobId;
private final BigQueryJobType operation;
private final String selectQuery;
private final Metrics metrics;
private Long numRows;

public static BigQuerySelectDataset getInstance(String datasetName,
Expand All @@ -49,7 +53,8 @@ public static BigQuerySelectDataset getInstance(String datasetName,
String bqTable,
String jobId,
BigQueryJobType jobType,
String selectQuery) {
String selectQuery,
Metrics metrics) {

return new BigQuerySelectDataset(datasetName,
outputSchema,
Expand All @@ -60,7 +65,8 @@ public static BigQuerySelectDataset getInstance(String datasetName,
bqTable,
jobId,
jobType,
selectQuery);
selectQuery,
metrics);
}

private BigQuerySelectDataset(String datasetName,
Expand All @@ -72,7 +78,8 @@ private BigQuerySelectDataset(String datasetName,
String bqTable,
String jobId,
BigQueryJobType operation,
String selectQuery) {
String selectQuery,
Metrics metrics) {
this.datasetName = datasetName;
this.outputSchema = outputSchema;
this.sqlEngineConfig = sqlEngineConfig;
Expand All @@ -83,6 +90,7 @@ private BigQuerySelectDataset(String datasetName,
this.jobId = jobId;
this.operation = operation;
this.selectQuery = selectQuery;
this.metrics = metrics;
}

public BigQuerySelectDataset execute() {
Expand Down Expand Up @@ -124,14 +132,14 @@ public BigQuerySelectDataset execute() {
if (queryJob == null) {
throw new SQLEngineException("BigQuery job not found: " + jobId);
} else if (queryJob.getStatus().getError() != null) {
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
throw new SQLEngineException(String.format(
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
}

LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
return this;
}

Expand Down
Loading