Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<avro.version>1.8.2</avro.version>
<bigquery.connector.hadoop2.version>hadoop2-1.0.0</bigquery.connector.hadoop2.version>
<commons.codec.version>1.4</commons.codec.version>
<cdap.version>6.7.0</cdap.version>
<cdap.version>6.8.0-SNAPSHOT</cdap.version>
<cdap.plugin.version>2.10.0-SNAPSHOT</cdap.plugin.version>
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ void initSQLEngineOutput(BatchSinkContext context,
String tableName,
@Nullable Schema tableSchema,
FailureCollector collector) {
// Sink Pushdown is not supported if the sink schema is not defined.
if (tableSchema == null) {
LOG.debug("BigQuery SQL Engine Output was not initialized. Schema was empty.");
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this just means that bqexecute will not be pushed down for schemaless and will work in the old way, correct?

Copy link
Contributor Author

@fernst fernst Sep 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BQ Execute is not a stage that can be "pushed down" in the traditional pipeline sense. BQ Execute operates on tables in BigQuery after the sink operation is completed.

Schemaless pipelines still work in Spark as they did before version 0.20 of the plugin / CDAP 6.7

}

List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema,
getConfig().isAllowSchemaRelaxation(),
config.getDatasetProject(), config.getDataset(), config.isTruncateTableSet(), collector);
Expand All @@ -175,7 +181,7 @@ void initSQLEngineOutput(BatchSinkContext context,
arguments
.put(BigQueryWrite.SQL_OUTPUT_JOB_ID, jobId + "_write")
.put(BigQueryWrite.SQL_OUTPUT_CONFIG, GSON.toJson(config))
.put(BigQueryWrite.SQL_OUTPUT_SCHEMA, tableSchema != null ? GSON.toJson(tableSchema) : null)
.put(BigQueryWrite.SQL_OUTPUT_SCHEMA, GSON.toJson(tableSchema))
.put(BigQueryWrite.SQL_OUTPUT_FIELDS, GSON.toJson(fieldNames));

context.addOutput(new SQLEngineOutput(outputName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@

import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -87,19 +90,21 @@ public void testBigQuerySinkInvalidConfig() {

@Test
public void testBigQueryTimePartitionConfig() {
Schema schema = Schema.recordOf("record",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
Schema.Field.of("timestamp",
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
Schema schema =
Schema.recordOf("record",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
Schema.Field.of("timestamp",
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));

BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
"TIME", 0L, 100L, 10L, null);
BigQuerySinkConfig config =
new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
"TIME", 0L, 100L, 10L, null);
config.partitionByField = "dt";

MockFailureCollector collector = new MockFailureCollector("bqsink");
config.validate(collector);
Assert.assertEquals(0, collector.getValidationFailures().size());
Expand Down Expand Up @@ -382,4 +387,19 @@ public void testDatasetWithSpecialCharacters() {
Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new!table?2020"));
Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new^table|2020"));
}

@Test
public void testInitSQLEngineOutputDoesNotInitOutputWithNullSchema() throws Exception {
BatchSinkContext sinkContext = mock(BatchSinkContext.class);
MockFailureCollector collector = new MockFailureCollector("bqsink");

BigQuerySinkConfig config =
new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", null,
null, null, null, null, null);
BigQuery bigQueryMock = mock(BigQuery.class);

BigQuerySink sink = new BigQuerySink(config);
sink.initSQLEngineOutput(sinkContext, bigQueryMock, "sink", "sink", "table", null, collector);
verify(sinkContext, never()).addOutput(any());
}
}