Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cli/api/BUILD
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package(default_visibility = ["//visibility:public"])

load("//tools:ts_library.bzl", "ts_library")
load("//testing:index.bzl", "ts_test_suite")

ts_library(
name = "api",
srcs = glob(
["**/*.ts"],
exclude = ["utils/**/*.*"],
exclude = ["utils/**/*.*", "**/*_test.ts"],
),
deps = [
"//cli/api/utils",
Expand Down Expand Up @@ -42,3 +43,16 @@ ts_library(
"@npm//tmp",
],
)

ts_test_suite(
name = "tests",
srcs = ["dbadapters/execution_sql_test.ts"],
deps = [
":api",
"//core",
"//protos:ts",
"//testing",
"@npm//@types/chai",
"@npm//chai",
],
)
325 changes: 308 additions & 17 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as crypto from "crypto";
import * as semver from "semver";

import { concatenateQueries, Task, Tasks } from "df/cli/api/dbadapters/tasks";
Expand Down Expand Up @@ -141,23 +142,34 @@ from (${query}) as insertions`;
if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.where(table.incrementalQuery || table.query, table.where),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.where(table.incrementalQuery || table.query, table.where)
)
)
);
const onSchemaChange = table.onSchemaChange ?? dataform.OnSchemaChange.IGNORE;
switch (onSchemaChange) {
case dataform.OnSchemaChange.FAIL:
case dataform.OnSchemaChange.EXTEND:
case dataform.OnSchemaChange.SYNCHRONIZE:
this.buildIncrementalSchemaChangeTasks(tasks, table);
break;
case dataform.OnSchemaChange.IGNORE:
default:
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.where(table.incrementalQuery || table.query, table.where),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.where(table.incrementalQuery || table.query, table.where)
)
)
);
break;
}
}
} else {
tasks.add(Task.statement(this.createOrReplace(table)));
Expand Down Expand Up @@ -186,6 +198,285 @@ from (${query}) as insertions`;
return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`;
}

private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) {
const uniqueId = crypto.randomUUID().replace(/-/g, "_");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this UUID to be cryptographically secure


const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`;
const emptyTempTableName = this.resolveTarget({
...table.target,
name: shortEmptyTableName
});

const shortDataTableName = shortEmptyTableName.replace("_empty", "_data");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to not use a temporary table for results of incremental query following this comment.

const dataTempTableName = this.resolveTarget({
...table.target,
name: shortDataTableName
});

const procedureName = this.createProcedureName(table.target, uniqueId);
const procedureBody = this.incrementalSchemaChangeBody(
table,
this.resolveTarget(table.target),
emptyTempTableName,
dataTempTableName,
shortEmptyTableName
Comment on lines +220 to +222
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass both shortEmptyTableName and emptyTempTableName here?

);

const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}()
OPTIONS(strict_mode=false)
BEGIN
${procedureBody}
END;`;

const callProcedureSql = this.safeCallAndDropProcedure(
procedureName,
emptyTempTableName,
dataTempTableName
);
tasks.add(Task.statement(createProcedureSql));
tasks.add(Task.statement(callProcedureSql));
tasks.add(Task.statement(`DROP PROCEDURE IF EXISTS ${procedureName};`));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be called in safeCallAndDropProcedure?

}

private createProcedureName(target: dataform.ITarget, uniqueId: string): string {
// Procedure names cannot contain hyphens.
const sanitizedUniqueId = uniqueId.replace(/-/g, "_");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uniqueId already seems to be sanitized in the code above?

return this.resolveTarget({
...target,
name: `df_osc_${sanitizedUniqueId}`
});
}

private safeCallAndDropProcedure(
procedureName: string,
emptyTempTableName: string,
dataTempTableName: string
): string {
return `
BEGIN
CALL ${procedureName}();
EXCEPTION WHEN ERROR THEN
DROP TABLE IF EXISTS ${emptyTempTableName};
DROP TABLE IF EXISTS ${dataTempTableName};
DROP PROCEDURE IF EXISTS ${procedureName};
RAISE;
END;
DROP PROCEDURE IF EXISTS ${procedureName};`;
}

private inferSchemaSql(emptyTempTableName: string, query: string): string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn't actually infers schema, so I'd rename this function

return `
-- Infer schema of new query.
CREATE OR REPLACE TABLE ${emptyTempTableName} AS (
SELECT * FROM (${query}) AS insertions LIMIT 0
);`;
}

private compareSchemasSql(
database: string,
schema: string,
targetName: string,
shortEmptyTableName: string
): string {
return `
-- Compare schemas
DECLARE dataform_columns ARRAY<STRING>;
DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_removed ARRAY<STRING>;

SET dataform_columns = (
SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), [])
FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\`
WHERE table_name = '${targetName}'
);

SET temp_table_columns = (
SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), [])
FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\`
WHERE table_name = '${shortEmptyTableName}'
);

SET columns_added = (
SELECT IFNULL(ARRAY_AGG(column_info), [])
FROM UNNEST(temp_table_columns) AS column_info
WHERE column_info.column_name NOT IN UNNEST(dataform_columns)
);
SET columns_removed = (
SELECT IFNULL(ARRAY_AGG(column_name), [])
FROM UNNEST(dataform_columns) AS column_name
WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col)
);`;
}

private applySchemaChangeStrategySql(
table: dataform.ITable,
qualifiedTargetTableName: string
): string {
const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE;
let sql = `
-- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`;

switch (onSchemaChange) {
case dataform.OnSchemaChange.FAIL:
sql += `
IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is %t correct formatting here?

columns_added,
columns_removed
);
END IF;
`;
break;
case dataform.OnSchemaChange.EXTEND:
sql += `
IF ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t",
columns_removed
);
END IF;

FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO
EXECUTE IMMEDIATE FORMAT(
"ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's optimize following one of reported issues (multiple ALTER TABLE calls can eat quota): it should be possible to add all columns in one ALTER TABLE statement

column_info.column_name,
column_info.data_type
);
END FOR;
`;
break;
case dataform.OnSchemaChange.SYNCHRONIZE:
const uniqueKeys = table.uniqueKey || [];
sql += `
FOR removed_column_name IN (SELECT * FROM UNNEST(columns_removed)) DO
IF removed_column_name IN UNNEST(${JSON.stringify(uniqueKeys)}) THEN
RAISE USING MESSAGE = FORMAT(
"Cannot drop column %s as it is part of the unique key for table ${qualifiedTargetTableName}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not mix different string formatting ways like %s and ${qualifiedTargetTableName}

removed_column_name
);
ELSE
EXECUTE IMMEDIATE FORMAT(
"ALTER TABLE ${qualifiedTargetTableName} DROP COLUMN IF EXISTS %s",
removed_column_name
);
END IF;
END FOR;

FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO
EXECUTE IMMEDIATE FORMAT(
"ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s",
column_info.column_name,
column_info.data_type
);
END FOR;
Comment on lines +367 to +373
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not duplicate add statements and put it into separate function

`;
break;
}
return sql;
}

private runFinalDmlSql(
table: dataform.ITable,
qualifiedTargetTableName: string,
dataTempTableName: string
): string {
return [
this.createIncrementalDataTempTableSql(table, dataTempTableName),
this.declareDataformColumnsListSql(),
this.executeMergeOrInsertSql(table, qualifiedTargetTableName, dataTempTableName)
].join("\n");
}

private createIncrementalDataTempTableSql(table: dataform.ITable, dataTempTableName: string): string {
return `
CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS (
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a maintainer of this repo but:

I've noticed this in the GCP Dataform compiled version of an EXTEND schema change incremental table, but for the life of me I cannot discern why this intermediate table is needed. It doesn't seem to be used for anything other than the MERGE statement, and the INSERT on line 420. Could the query be used directly in the MERGE, and in the INSERT? Materializing into an intermediate table increases the cost of the query, sometimes by quite a lot.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataTempTableName is used to ensure the columns selected for the MERGE or INSERT operation exactly match the schema determined after potential ALTER TABLE operations, as reflected in the temp_table_columns variable. The final DML statement is constructed dynamically within EXECUTE IMMEDIATE using column lists derived from temp_table_columns (e.g., dataform_columns_list). To use these dynamic column lists in the SELECT part of the MERGE's USING clause or the INSERT's SELECT statement, the source data must be in a table format. Using the raw table.incrementalQuery directly would not allow dynamic column selection based on dataform_columns_list within the EXECUTE IMMEDIATE context.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, we actually don't have a specific reason for doing it. It is clean up leftover from the when we were using store procedure inside the execute immediate. @SuchodolskiEdvin it might be worth running some tests on the engine directly with the generated queries with table.incrementalQuery instead of temp table.

@spatel11 Thanks for pointing this out. Please feel free to open a feature request in our public tracker and we will take a look and plan this change.
https://issuetracker.google.com/issues?q=status:open%20componentid:1193995&s=created_time:desc

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${this.where(table.incrementalQuery || table.query, table.where)}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to have a separate subfunction to generate SELECT part of an incremental query so it won't be repeated in multiple places (or just initialize it in one place)

);`;
}

private declareDataformColumnsListSql(): string {
return `
DECLARE dataform_columns_list STRING;
SET dataform_columns_list = (
SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '')
FROM UNNEST(temp_table_columns)
);`;
}

private executeMergeOrInsertSql(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems that you added a new logic of generating MERGE or INSERT statements from what we had before in mergeInto / insertInto functions.

I think we should avoid duplicating this logic and different behaviour when incremental schema change is enabled / disabled. So let's consolidate them

table: dataform.ITable,
qualifiedTargetTableName: string,
dataTempTableName: string
): string {
if (table.uniqueKey && table.uniqueKey.length > 0) {
const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and ");
const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter;
const mergeOnClauseWithFilter = updatePartitionFilter
? `${mergeOnClause} and T.${updatePartitionFilter}`
: mergeOnClause;

return `
DECLARE dataform_columns_merge STRING;
SET dataform_columns_merge = (
SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '')
FROM UNNEST(temp_table_columns)
);

IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need these checks?

EXECUTE IMMEDIATE (
"MERGE \`${qualifiedTargetTableName}\` T " ||
"USING \`${dataTempTableName}\` S " ||
"ON ${mergeOnClauseWithFilter} " ||
"WHEN MATCHED THEN " ||
" UPDATE SET " || dataform_columns_merge || " " ||
"WHEN NOT MATCHED THEN " ||
" INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")"
);
END IF;`;
} else {
return `
IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's separate generations of merge / insert statements into separate functions

EXECUTE IMMEDIATE (
"INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " ||
"SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`"
);
END IF;`;
}
}

private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string {
return `
-- Cleanup temporary tables.
DROP TABLE IF EXISTS ${emptyTempTableName};
DROP TABLE IF EXISTS ${dataTempTableName};
`;
}

private incrementalSchemaChangeBody(
table: dataform.ITable,
qualifiedTargetTableName: string,
emptyTempTableName: string,
dataTempTableName: string,
shortEmptyTableName: string
): string {
const statements: string[] = [
this.inferSchemaSql(emptyTempTableName, table.incrementalQuery || table.query),
this.compareSchemasSql(
table.target.database,
table.target.schema,
table.target.name,
shortEmptyTableName
),
this.applySchemaChangeStrategySql(table, qualifiedTargetTableName),
this.runFinalDmlSql(table, qualifiedTargetTableName, dataTempTableName),
this.cleanupSql(emptyTempTableName, dataTempTableName)
];

return statements.join("\n\n");
}

private createOrReplace(table: dataform.ITable) {
const options = [];
if (table.bigquery && table.bigquery.partitionBy && table.bigquery.partitionExpirationDays) {
Expand Down
Loading
Loading