Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.cdap.cdap.etl.api.aggregation.DeduplicateAggregationDefinition;
import io.cdap.cdap.etl.api.aggregation.GroupByAggregationDefinition;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
Expand All @@ -11,13 +12,16 @@
import io.cdap.plugin.gcp.bigquery.sqlengine.builder.BigQueryGroupBySQLBuilder;
import io.cdap.plugin.gcp.bigquery.sqlengine.builder.BigQueryNestedSelectSQLBuilder;
import io.cdap.plugin.gcp.bigquery.sqlengine.builder.BigQuerySelectSQLBuilder;
import org.apache.parquet.Strings;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;

/**
Expand All @@ -29,8 +33,9 @@ public class BigQueryRelation implements Relation {
private final String datasetName;
private final Set<String> columns;
private final BigQueryRelation parent;
private final Supplier<String> sqlStatementSupplier;

private Map<String, BigQuerySQLDataset> sourceDatasets;
private final Supplier<String> expressionSupplier;

/**
* Gets a new BigQueryRelation instance
Expand All @@ -50,13 +55,22 @@ protected BigQueryRelation(String datasetName,
this.datasetName = datasetName;
this.columns = columns;
this.parent = null;
this.expressionSupplier = () -> {
this.sqlStatementSupplier = () -> {

// Check if Dataset exists
BigQuerySQLDataset sourceDataset = sourceDatasets.get(datasetName);
if (sourceDataset == null) {
throw new SQLEngineException("Unable to find dataset with name " + datasetName);
}

// Build selected columns for this dataset based on initial columns present in relation.
Map<String, Expression> selectedColumns = getSelectedColumns(columns);
// Build source table identifier using the Project, Dataset and Table
String sourceTable = String.format("%s.%s.%s",
sourceDataset.getBigQueryProject(),
sourceDataset.getBigQueryDataset(),
sourceDataset.getBigQueryTable());
// Build initial select from the source table.
return buildBaseSelect(selectedColumns, sourceTable, datasetName);
};
}
Expand All @@ -65,11 +79,11 @@ protected BigQueryRelation(String datasetName,
protected BigQueryRelation(String datasetName,
Set<String> columns,
BigQueryRelation parent,
Supplier<String> expressionSupplier) {
Supplier<String> sqlStatementSupplier) {
this.datasetName = datasetName;
this.columns = columns;
this.parent = parent;
this.expressionSupplier = expressionSupplier;
this.sqlStatementSupplier = sqlStatementSupplier;
}

private Relation getInvalidRelation(String validationError) {
Expand All @@ -88,6 +102,7 @@ public String getValidationError() {

/**
* Get parent relation
*
* @return parent relation instance. This can be null for a base relation.
*/
@Nullable
Expand All @@ -97,14 +112,16 @@ public Relation getParent() {

/**
* Method use to materialize the transform expression from this dataset.
*
* @return transform expression used when executing SQL statements.
*/
public String getTransformExpression() {
return expressionSupplier.get();
public String getSQLStatement() {
return sqlStatementSupplier.get();
}

/**
* Get columns defined in this relation
*
* @return Columns defined in this relation.
*/
public Set<String> getColumns() {
Expand All @@ -125,6 +142,7 @@ public void setInputDatasets(Map<String, BigQuerySQLDataset> datasets) {

/**
* Get the dataset name for this relation.
*
* @return dataset name
*/
public String getDatasetName() {
Expand All @@ -133,30 +151,32 @@ public String getDatasetName() {

/**
* Get a new relation with a redefined dataset name.
*
* @param newDatasetName new dataset name for this relation.
* @return new Relation with the new dataset name.
*/
public Relation setDatasetName(String newDatasetName) {
Map<String, Expression> selectedColumns = getSelectedColumns(columns);
// Build new transform expression and return new instance.
Supplier<String> supplier =
() -> buildNestedSelect(selectedColumns, getTransformExpression(), newDatasetName, null);
() -> buildNestedSelect(selectedColumns, getSQLStatement(), newDatasetName, null);
return new BigQueryRelation(newDatasetName, columns, this, supplier);
}

@Override
public Relation setColumn(String column, Expression value) {
// check if expression is supported and valid
if (!supportsExpression(value)) {
return getInvalidRelation("Unsupported or invalid expression type.");
return getInvalidRelation("Unsupported or invalid expression type : "
+ getInvalidExpressionCause(value));
}

Map<String, Expression> selectedColumns = getSelectedColumns(columns);
selectedColumns.put(column, value);

// Build new transform expression and return new instance.
Supplier<String> supplier =
() -> buildNestedSelect(selectedColumns, getTransformExpression(), datasetName, null);
() -> buildNestedSelect(selectedColumns, getSQLStatement(), datasetName, null);
return new BigQueryRelation(datasetName, selectedColumns.keySet(), this, supplier);
}

Expand All @@ -173,64 +193,68 @@ public Relation dropColumn(String column) {

// Build new transform expression and return new instance.
Supplier<String> supplier =
() -> buildNestedSelect(selectedColumns, getTransformExpression(), datasetName, null);
() -> buildNestedSelect(selectedColumns, getSQLStatement(), datasetName, null);
return new BigQueryRelation(datasetName, selectedColumns.keySet(), this, supplier);
}

@Override
public Relation select(Map<String, Expression> columns) {
// check if all expressions are supported and valid
if (!supportsExpressions(columns.values())) {
return getInvalidRelation("Unsupported or invalid expression type.");
return getInvalidRelation("Unsupported or invalid expression type: "
+ getInvalidExpressionCauses(columns.values()));
}

// Build new transform expression and return new instance.
Supplier<String> supplier =
() -> buildNestedSelect(columns, getTransformExpression(), datasetName, null);
() -> buildNestedSelect(columns, getSQLStatement(), datasetName, null);
return new BigQueryRelation(datasetName, columns.keySet(), this, supplier);
}

@Override
public Relation filter(Expression filter) {
// check if expression is supported and valid
if (!supportsExpression(filter)) {
return getInvalidRelation("Unsupported or invalid expression type.");
return getInvalidRelation("Unsupported or invalid expression type: "
+ getInvalidExpressionCause(filter));
}

Map<String, Expression> selectedColumns = getSelectedColumns(columns);
// Build new transform expression and return new instance.
Supplier<String> supplier =
() -> buildNestedSelect(selectedColumns, getTransformExpression(), datasetName, filter);
() -> buildNestedSelect(selectedColumns, getSQLStatement(), datasetName, filter);
return new BigQueryRelation(datasetName, columns, this, supplier);
}

@Override
public Relation groupBy(GroupByAggregationDefinition definition) {
// Ensure all expressions supplied in this definition are supported and valid
if (!supportsGroupByAggregationDefinition(definition)) {
return getInvalidRelation("DeduplicateAggregationDefinition contains " +
"unsupported or invalid expressions");
return getInvalidRelation("DeduplicateAggregationDefinition contains "
+ "unsupported or invalid expressions: "
+ collectGroupByAggregationDefinitionErrors(definition));
}

Set<String> columns = definition.getSelectExpressions().keySet();

// Build new transform expression and return new instance.
Supplier<String> supplier =
() -> buildGroupBy(definition, getTransformExpression(), datasetName);
() -> buildGroupBy(definition, getSQLStatement(), datasetName);
return new BigQueryRelation(datasetName, columns, this, supplier);
}

@Override
public Relation deduplicate(DeduplicateAggregationDefinition definition) {
// Ensure all expressions supplied in this definition are supported and valid
if (!supportsDeduplicateAggregationDefinition(definition)) {
return getInvalidRelation("DeduplicateAggregationDefinition contains " +
"unsupported or invalid expressions");
return getInvalidRelation("DeduplicateAggregationDefinition contains "
+ "unsupported or invalid expressions: "
+ collectDeduplicateAggregationDefinitionErrors(definition));
}

Set<String> columns = definition.getSelectExpressions().keySet();
Supplier<String> supplier =
() -> buildDeduplicate(definition, getTransformExpression(), datasetName);
() -> buildDeduplicate(definition, getSQLStatement(), datasetName);
return new BigQueryRelation(datasetName, columns, this, supplier);
}

Expand Down Expand Up @@ -335,6 +359,41 @@ protected static boolean supportsGroupByAggregationDefinition(GroupByAggregation
&& supportsExpressions(groupByExpressions);
}

/**
* Check if all expressions contained in a {@link GroupByAggregationDefinition} are supported.
*
* @param def {@link GroupByAggregationDefinition} to verify.
* @return boolean specifying if all expressions are supported or not.
*/
@VisibleForTesting
@Nullable
protected static String collectGroupByAggregationDefinitionErrors(GroupByAggregationDefinition def) {
// If the expression is valid, this must be null.
if (supportsGroupByAggregationDefinition(def)) {
return null;
}

// Gets all expressions defined in this definition
Collection<Expression> selectExpressions = def.getSelectExpressions().values();
Collection<Expression> groupByExpressions = def.getGroupByExpressions();

// Get all invalid expression causes, and prepend field origin to error reasons
String selectErrors = getInvalidExpressionCauses(selectExpressions);
if (!Strings.isNullOrEmpty(selectErrors)) {
selectErrors = "Select fields: " + selectErrors;
}

String groupByErrors = getInvalidExpressionCauses(groupByExpressions);
if (!Strings.isNullOrEmpty(groupByErrors)) {
groupByErrors = "Grouping fields: " + groupByErrors;
}

// Build string which concatenates all non-empty error groups, separated by a hyphen.
return Stream.of(selectErrors, groupByErrors)
.filter(Objects::nonNull)
.collect(Collectors.joining(" - "));
}

/**
* Builds a new {@link GroupByAggregationDefinition} with qualified aliases for the Select Expression.
*
Expand Down Expand Up @@ -370,6 +429,50 @@ && supportsExpressions(dedupExpressions)
&& supportsExpressions(orderExpressions);
}

/**
* Check if all expressions contained in a {@link DeduplicateAggregationDefinition} are supported.
*
* @param def {@link DeduplicateAggregationDefinition} to verify.
* @return boolean specifying if all expressions are supported or not.
*/
@VisibleForTesting
@Nullable
protected static String collectDeduplicateAggregationDefinitionErrors(DeduplicateAggregationDefinition def) {
// If the expression is valid, this must be null.
if (supportsDeduplicateAggregationDefinition(def)) {
return null;
}

// Gets all expressions defined in this definition
Collection<Expression> selectExpressions = def.getSelectExpressions().values();
Collection<Expression> dedupExpressions = def.getGroupByExpressions();
Collection<Expression> orderExpressions = def.getFilterExpressions()
.stream()
.map(DeduplicateAggregationDefinition.FilterExpression::getExpression)
.collect(Collectors.toSet());

// Get all invalid expression causes, and prepend origin to error reasons
String selectErrors = getInvalidExpressionCauses(selectExpressions);
if (!Strings.isNullOrEmpty(selectErrors)) {
selectErrors = "Select fields: " + selectErrors;
}

String dedupErrors = getInvalidExpressionCauses(dedupExpressions);
if (!Strings.isNullOrEmpty(dedupErrors)) {
dedupErrors = "Deduplication fields: " + dedupErrors;
}

String orderErrors = getInvalidExpressionCauses(orderExpressions);
if (!Strings.isNullOrEmpty(orderErrors)) {
orderErrors = "Order fields: " + orderErrors;
}

// Build string which concatenates all non-empty error groups, separated by a hyphen.
return Stream.of(selectErrors, dedupErrors, orderErrors)
.filter(Objects::nonNull)
.collect(Collectors.joining(" - "));
}

/**
* Builds a new {@link DeduplicateAggregationDefinition} with qualified aliases for the Select Expression.
*
Expand Down Expand Up @@ -406,4 +509,53 @@ protected static boolean supportsExpression(Expression expression) {
return expression instanceof SQLExpression && expression.isValid();
}

/**
* Collects validation errors for a colleciton of expressions
*
* @param expressions collection containing expressions to verify
* @return boolean specifying if all expressions are supported or not.
*/
@VisibleForTesting
@Nullable
protected static String getInvalidExpressionCauses(Collection<Expression> expressions) {
// If the expressions are valid, this is null.
if (supportsExpressions(expressions)) {
return null;
}

// Collect failure reasons
return expressions.stream()
.map(BigQueryRelation::getInvalidExpressionCause)
.filter(Objects::nonNull)
.collect(Collectors.joining(" ; "));
}

/**
* Gets the validation error for an expression
*
* @param expression expression to verity
* @return Validation error for this expression, or null if the expression is valid.
*/
@VisibleForTesting
@Nullable
protected static String getInvalidExpressionCause(Expression expression) {
// If the expressions are valid, this is null.
if (supportsExpression(expression)) {
return null;
}

// Null expressions are not supported
if (expression == null) {
return "Expression is null";
}

// We only support SQLExpression
if (!(expression instanceof SQLExpression)) {
return "Unsupported Expression type \"" + expression.getClass().getCanonicalName() + "\"";
}

// Return validation error for this expression.
return expression.getValidationError() != null ? expression.getValidationError() : "Unknown";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public SQLDataset transform(SQLTransformRequest context) throws SQLEngineExcepti
return executeSelect(context.getOutputDatasetName(),
context.getOutputSchema(),
BigQueryJobType.TRANSFORM,
relation.getTransformExpression());
relation.getSQLStatement());
}

private BigQuerySelectDataset executeSelect(String datasetName,
Expand Down
Loading