Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ protected static boolean isValidJoinDefinition(SQLJoinDefinition sqlJoinDefiniti
validationProblems);
}

// Validate join stages for join on keys
if (joinDefinition.getCondition().getOp() == JoinCondition.Op.KEY_EQUALITY) {
BigQuerySQLEngineUtils.validateJoinOnKeyStages(joinDefinition, validationProblems);
}

if (!validationProblems.isEmpty()) {
LOG.warn("Join operation for stage '{}' could not be executed in BigQuery. Issues found: {}.",
sqlJoinDefinition.getDatasetName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.join.JoinCondition;
import io.cdap.cdap.etl.api.join.JoinDefinition;
import io.cdap.cdap.etl.api.join.JoinStage;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngineConfig;
Expand Down Expand Up @@ -214,6 +215,35 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression
}
}

/**
* Validates stages for a Join on Key operation
*
* TODO: Update logic once BQ SQL engine joins support multiple outer join tables
*
* @param joinDefinition Join Definition to validate
* @param validationProblems List of validation problems to use to append messages
*/
public static void validateJoinOnKeyStages(JoinDefinition joinDefinition, List<String> validationProblems) {
// 2 stages are not an issue
if (joinDefinition.getStages().size() < 3) {
return;
}

// For 3 or more stages, we only support inner joins.
boolean isInnerJoin = true;

// If any of the stages is not required, this is an outer join
for (JoinStage stage : joinDefinition.getStages()) {
isInnerJoin &= stage.isRequired();
}

if (!isInnerJoin) {
validationProblems.add(
String.format("Only 2 input stages are supported for outer joins, %d stages supplied.",
joinDefinition.getStages().size()));
}
}

/**
* Ensure the Stage name is valid for execution in BQ pushdown.
* <p>
Expand All @@ -230,6 +260,7 @@ public static boolean isValidIdentifier(String identifier) {

/**
* Get tags for BQ Pushdown tags
*
* @param operation the current operation that is being executed
* @return Map containing tags for a job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,49 @@ public void testIsValidJoinDefinitionOnKey() {
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));

Schema outputSchema =
Schema.recordOf("Join",
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
Schema.Field.of("from_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));

// First join is a right join, second join is a left join
JoinStage shipments = JoinStage.builder("Shipments", shipmentSchema).setRequired(true).build();
JoinStage fromAddresses = JoinStage.builder("FromAddress", fromAddressSchema).setRequired(true).build();

// null safe
JoinCondition condition = JoinCondition.onKeys()
.addKey(new JoinKey("Shipments", Arrays.asList("id")))
.addKey(new JoinKey("FromAddress", Arrays.asList("shipment_id")))
.setNullSafe(false)
.build();

JoinDefinition joinDefinition = JoinDefinition.builder()
.select(new JoinField("Shipments", "id", "shipment_id"),
new JoinField("FromAddress", "zip", "from_zip"))
.from(shipments, fromAddresses)
.on(condition)
.setOutputSchemaName("Join")
.setOutputSchema(outputSchema)
.build();

SQLJoinDefinition sqlJoinDefinition = new SQLJoinDefinition("Join", joinDefinition);

Assert.assertTrue(BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition));
verify(logger, times(0)).warn(anyString(), anyString(), anyString());
}

@Test
public void testInnerJoinFor3StagesIsSupported() {
Schema shipmentSchema =
Schema.recordOf("Shipments",
Schema.Field.of("id", Schema.of(Schema.Type.INT)));

Schema fromAddressSchema =
Schema.recordOf("FromAddress",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));

Schema toAddressSchema =
Schema.recordOf("ToAddress",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Expand Down Expand Up @@ -116,6 +159,74 @@ public void testIsValidJoinDefinitionOnKey() {
verify(logger, times(0)).warn(anyString(), anyString(), anyString());
}

@Test
public void testOuterJoinFor3StagesIsNotSupported() {
ArgumentCaptor<String> messageTemplateCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> stageNameCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> issuesCaptor = ArgumentCaptor.forClass(String.class);

Schema shipmentSchema =
Schema.recordOf("Shipments",
Schema.Field.of("id", Schema.of(Schema.Type.INT)));

Schema fromAddressSchema =
Schema.recordOf("FromAddress",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));

Schema toAddressSchema =
Schema.recordOf("ToAddress",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));

Schema outputSchema =
Schema.recordOf("Join",
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
Schema.Field.of("from_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))),
Schema.Field.of("to_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));

// First join is a right join, second join is a left join
JoinStage shipments = JoinStage.builder("Shipments", shipmentSchema).setRequired(true).build();
JoinStage fromAddresses = JoinStage.builder("FromAddress", fromAddressSchema).setRequired(true).build();
JoinStage toAddresses = JoinStage.builder("ToAddress", toAddressSchema).setRequired(false).build();

// null safe
JoinCondition condition = JoinCondition.onKeys()
.addKey(new JoinKey("Shipments", Arrays.asList("id")))
.addKey(new JoinKey("FromAddress", Arrays.asList("shipment_id")))
.addKey(new JoinKey("ToAddress", Arrays.asList("shipment_id")))
.setNullSafe(false)
.build();

JoinDefinition joinDefinition = JoinDefinition.builder()
.select(new JoinField("Shipments", "id", "shipment_id"),
new JoinField("FromAddress", "zip", "from_zip"),
new JoinField("ToAddress", "zip", "to_zip"))
.from(shipments, fromAddresses, toAddresses)
.on(condition)
.setOutputSchemaName("Join")
.setOutputSchema(outputSchema)
.build();

SQLJoinDefinition sqlJoinDefinition = new SQLJoinDefinition("Join", joinDefinition);

Assert.assertFalse(BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition));
verify(logger).warn(messageTemplateCaptor.capture(), stageNameCaptor.capture(), issuesCaptor.capture());

String messageTemplate = messageTemplateCaptor.getValue();
Assert.assertTrue(messageTemplate.contains(
"Join operation for stage '{}' could not be executed in BigQuery. Issues found:"));

String stageName = stageNameCaptor.getValue();
Assert.assertEquals("Join", stageName);

String issues = issuesCaptor.getValue();
Assert.assertTrue(issues.contains(
"Only 2 input stages are supported for outer joins, 3 stages supplied."));
}

@Test
public void testIsValidJoinDefinitionOnKeyWithErrors() {
ArgumentCaptor<String> messageTemplateCaptor = ArgumentCaptor.forClass(String.class);
Expand Down