Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,14 @@ public void executeJoin() {
if (queryJob == null) {
throw new SQLEngineException("BigQuery job not found: " + jobId);
} else if (queryJob.getStatus().getError() != null) {
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
throw new SQLEngineException(String.format(
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
}

LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ public BigQuerySelectDataset execute() {
if (queryJob == null) {
throw new SQLEngineException("BigQuery job not found: " + jobId);
} else if (queryJob.getStatus().getError() != null) {
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
throw new SQLEngineException(String.format(
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
}

LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,

// Check for errors
if (queryJob.getStatus().getError() != null) {
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}",
jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(),
queryJob.getStatus().getError().toString());
Expand All @@ -241,6 +242,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows,
sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable());
BigQuerySQLEngineUtils.logJobMetrics(queryJob);

return SQLWriteResult.success(datasetName, numRows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.QueryStage;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.gson.Gson;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.join.JoinCondition;
Expand All @@ -41,6 +45,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand All @@ -49,6 +54,7 @@
public class BigQuerySQLEngineUtils {

private static final Logger LOG = LoggerFactory.getLogger(BigQuerySQLEngineUtils.class);
private static final Gson GSON = new Gson();

public static final String GCS_PATH_FORMAT = BigQuerySinkUtils.GS_PATH_FORMAT + "/%s";
public static final String BQ_TABLE_NAME_FORMAT = "%s_%s";
Expand Down Expand Up @@ -218,7 +224,7 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression

/**
* Validates stages for a Join on Key operation
*
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this intentional?

* TODO: Update logic once BQ SQL engine joins support multiple outer join tables
*
* @param joinDefinition Join Definition to validate
Expand Down Expand Up @@ -292,4 +298,98 @@ public static Map<String, String> getJobTags(String operation) {
labels.put("pushdown_operation", operation);
return Collections.unmodifiableMap(labels);
}

/**
* Logs information about a BigQUery Job execution using a specified Logger instance
*
* @param job BigQuery Job
*/
public static void logJobMetrics(Job job) {
// Ensure job has statistics information
if (job.getStatistics() == null) {
LOG.warn("No statistics were found for BigQuery job {}", job.getJobId());
}

String startTimeStr = getISODateTimeString(job.getStatistics().getStartTime());
String endTimeStr = getISODateTimeString(job.getStatistics().getEndTime());
String executionTimeStr = getExecutionTimeString(job.getStatistics().getStartTime(),
job.getStatistics().getEndTime());

// Print detailed query statistics if available
if (job.getStatistics() instanceof JobStatistics.QueryStatistics) {
JobStatistics.QueryStatistics queryStatistics = (JobStatistics.QueryStatistics) job.getStatistics();
LOG.info("Metrics for job {}:\n" +
" Start: {} ,\n" +
" End: {} ,\n" +
" Execution time: {} ,\n" +
" Processed Bytes: {} ,\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also log numDmlAffectedRows? I think it should be more or less corresponding to output rows and we mostly talk about rows in CDAP

" Billed Bytes: {} ,\n" +
" Total Slot ms: {} ,\n" +
" Records per stage (read/write): {}",
job.getJobId().getJob(),
startTimeStr,
endTimeStr,
executionTimeStr,
queryStatistics.getTotalBytesProcessed(),
queryStatistics.getTotalBytesBilled(),
queryStatistics.getTotalSlotMs(),
getQueryStageRecordCounts(queryStatistics.getQueryPlan()));

if (LOG.isTraceEnabled()) {
LOG.trace("Additional Metrics for job {}:\n" +
" Query Plan: {} ,\n" +
" Query Timeline: {} \n",
job.getJobId().getJob(),
GSON.toJson(queryStatistics.getQueryPlan()),
GSON.toJson(queryStatistics.getTimeline()));
}

return;
}

// Print basic metrics
JobStatistics statistics = job.getStatistics();
LOG.info("Metrics for job: {}\n" +
" Start: {} ,\n" +
" End: {} ,\n" +
" Execution time: {}",
job.getJobId().getJob(),
startTimeStr,
endTimeStr,
executionTimeStr);
}

private static String getISODateTimeString(Long epoch) {
if (epoch == null) {
return "N/A";
}

return Instant.ofEpochMilli(epoch).toString();
}

private static String getExecutionTimeString(Long startEpoch, Long endEpoch) {
if (startEpoch == null || endEpoch == null) {
return "N/A";
}

return (endEpoch - startEpoch) + " ms";
}

private static String getQueryStageRecordCounts(List<QueryStage> queryPlan) {
if (queryPlan == null || queryPlan.isEmpty()) {
return "N/A";
}

return queryPlan.stream()
.map(qs -> formatRecordCount(qs.getRecordsRead()) + "/" + formatRecordCount(qs.getRecordsWritten()))
.collect(Collectors.joining(" , ", "[ ", " ]"));
}

private static String formatRecordCount(Long val) {
if (val == null) {
return "N/A";
}

return val.toString();
}
}