-
Notifications
You must be signed in to change notification settings - Fork 198
Support onSchemaChange for incremental tables #2101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import * as crypto from "crypto"; | ||
| import * as semver from "semver"; | ||
|
|
||
| import { concatenateQueries, Task, Tasks } from "df/cli/api/dbadapters/tasks"; | ||
|
|
@@ -141,23 +142,34 @@ from (${query}) as insertions`; | |
| if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) { | ||
| tasks.add(Task.statement(this.createOrReplace(table))); | ||
| } else { | ||
| tasks.add( | ||
| Task.statement( | ||
| table.uniqueKey && table.uniqueKey.length > 0 | ||
| ? this.mergeInto( | ||
| table.target, | ||
| tableMetadata?.fields.map(f => f.name), | ||
| this.where(table.incrementalQuery || table.query, table.where), | ||
| table.uniqueKey, | ||
| table.bigquery && table.bigquery.updatePartitionFilter | ||
| ) | ||
| : this.insertInto( | ||
| table.target, | ||
| tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), | ||
| this.where(table.incrementalQuery || table.query, table.where) | ||
| ) | ||
| ) | ||
| ); | ||
| const onSchemaChange = table.onSchemaChange ?? dataform.OnSchemaChange.IGNORE; | ||
| switch (onSchemaChange) { | ||
| case dataform.OnSchemaChange.FAIL: | ||
| case dataform.OnSchemaChange.EXTEND: | ||
| case dataform.OnSchemaChange.SYNCHRONIZE: | ||
| this.buildIncrementalSchemaChangeTasks(tasks, table); | ||
| break; | ||
| case dataform.OnSchemaChange.IGNORE: | ||
| default: | ||
| tasks.add( | ||
| Task.statement( | ||
| table.uniqueKey && table.uniqueKey.length > 0 | ||
| ? this.mergeInto( | ||
| table.target, | ||
| tableMetadata?.fields.map(f => f.name), | ||
| this.where(table.incrementalQuery || table.query, table.where), | ||
| table.uniqueKey, | ||
| table.bigquery && table.bigquery.updatePartitionFilter | ||
| ) | ||
| : this.insertInto( | ||
| table.target, | ||
| tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), | ||
| this.where(table.incrementalQuery || table.query, table.where) | ||
| ) | ||
| ) | ||
| ); | ||
| break; | ||
| } | ||
| } | ||
| } else { | ||
| tasks.add(Task.statement(this.createOrReplace(table))); | ||
|
|
@@ -186,6 +198,285 @@ from (${query}) as insertions`; | |
| return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`; | ||
| } | ||
|
|
||
| private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) { | ||
| const uniqueId = crypto.randomUUID().replace(/-/g, "_"); | ||
|
|
||
| const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`; | ||
| const emptyTempTableName = this.resolveTarget({ | ||
| ...table.target, | ||
| name: shortEmptyTableName | ||
| }); | ||
|
|
||
| const shortDataTableName = shortEmptyTableName.replace("_empty", "_data"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's try to not use a temporary table for results of incremental query following this comment. |
||
| const dataTempTableName = this.resolveTarget({ | ||
| ...table.target, | ||
| name: shortDataTableName | ||
| }); | ||
|
|
||
| const procedureName = this.createProcedureName(table.target, uniqueId); | ||
| const procedureBody = this.incrementalSchemaChangeBody( | ||
| table, | ||
| this.resolveTarget(table.target), | ||
| emptyTempTableName, | ||
| dataTempTableName, | ||
| shortEmptyTableName | ||
|
Comment on lines
+220
to
+222
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to pass both |
||
| ); | ||
|
|
||
| const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}() | ||
| OPTIONS(strict_mode=false) | ||
| BEGIN | ||
| ${procedureBody} | ||
| END;`; | ||
|
|
||
| const callProcedureSql = this.safeCallAndDropProcedure( | ||
| procedureName, | ||
| emptyTempTableName, | ||
| dataTempTableName | ||
| ); | ||
| tasks.add(Task.statement(createProcedureSql)); | ||
| tasks.add(Task.statement(callProcedureSql)); | ||
| tasks.add(Task.statement(`DROP PROCEDURE IF EXISTS ${procedureName};`)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to be called in |
||
| } | ||
|
|
||
| private createProcedureName(target: dataform.ITarget, uniqueId: string): string { | ||
| // Procedure names cannot contain hyphens. | ||
| const sanitizedUniqueId = uniqueId.replace(/-/g, "_"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return this.resolveTarget({ | ||
| ...target, | ||
| name: `df_osc_${sanitizedUniqueId}` | ||
| }); | ||
| } | ||
|
|
||
| private safeCallAndDropProcedure( | ||
| procedureName: string, | ||
| emptyTempTableName: string, | ||
| dataTempTableName: string | ||
| ): string { | ||
| return ` | ||
| BEGIN | ||
| CALL ${procedureName}(); | ||
| EXCEPTION WHEN ERROR THEN | ||
| DROP TABLE IF EXISTS ${emptyTempTableName}; | ||
| DROP TABLE IF EXISTS ${dataTempTableName}; | ||
| DROP PROCEDURE IF EXISTS ${procedureName}; | ||
| RAISE; | ||
| END; | ||
| DROP PROCEDURE IF EXISTS ${procedureName};`; | ||
| } | ||
|
|
||
| private inferSchemaSql(emptyTempTableName: string, query: string): string { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code doesn't actually infers schema, so I'd rename this function |
||
| return ` | ||
| -- Infer schema of new query. | ||
| CREATE OR REPLACE TABLE ${emptyTempTableName} AS ( | ||
| SELECT * FROM (${query}) AS insertions LIMIT 0 | ||
| );`; | ||
| } | ||
|
|
||
| private compareSchemasSql( | ||
| database: string, | ||
| schema: string, | ||
| targetName: string, | ||
| shortEmptyTableName: string | ||
| ): string { | ||
| return ` | ||
| -- Compare schemas | ||
| DECLARE dataform_columns ARRAY<STRING>; | ||
| DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>; | ||
| DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>; | ||
| DECLARE columns_removed ARRAY<STRING>; | ||
|
|
||
| SET dataform_columns = ( | ||
| SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) | ||
| FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\` | ||
| WHERE table_name = '${targetName}' | ||
| ); | ||
|
|
||
| SET temp_table_columns = ( | ||
| SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) | ||
| FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\` | ||
| WHERE table_name = '${shortEmptyTableName}' | ||
| ); | ||
|
|
||
| SET columns_added = ( | ||
| SELECT IFNULL(ARRAY_AGG(column_info), []) | ||
| FROM UNNEST(temp_table_columns) AS column_info | ||
| WHERE column_info.column_name NOT IN UNNEST(dataform_columns) | ||
| ); | ||
| SET columns_removed = ( | ||
| SELECT IFNULL(ARRAY_AGG(column_name), []) | ||
| FROM UNNEST(dataform_columns) AS column_name | ||
| WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) | ||
| );`; | ||
| } | ||
|
|
||
| private applySchemaChangeStrategySql( | ||
| table: dataform.ITable, | ||
| qualifiedTargetTableName: string | ||
| ): string { | ||
| const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; | ||
| let sql = ` | ||
| -- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`; | ||
|
|
||
| switch (onSchemaChange) { | ||
| case dataform.OnSchemaChange.FAIL: | ||
| sql += ` | ||
| IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN | ||
| RAISE USING MESSAGE = FORMAT( | ||
| "Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is |
||
| columns_added, | ||
| columns_removed | ||
| ); | ||
| END IF; | ||
| `; | ||
| break; | ||
| case dataform.OnSchemaChange.EXTEND: | ||
| sql += ` | ||
| IF ARRAY_LENGTH(columns_removed) > 0 THEN | ||
| RAISE USING MESSAGE = FORMAT( | ||
| "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t", | ||
| columns_removed | ||
| ); | ||
| END IF; | ||
|
|
||
| FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO | ||
| EXECUTE IMMEDIATE FORMAT( | ||
| "ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's optimize following one of reported issues (multiple |
||
| column_info.column_name, | ||
| column_info.data_type | ||
| ); | ||
| END FOR; | ||
| `; | ||
| break; | ||
| case dataform.OnSchemaChange.SYNCHRONIZE: | ||
| const uniqueKeys = table.uniqueKey || []; | ||
| sql += ` | ||
| FOR removed_column_name IN (SELECT * FROM UNNEST(columns_removed)) DO | ||
| IF removed_column_name IN UNNEST(${JSON.stringify(uniqueKeys)}) THEN | ||
| RAISE USING MESSAGE = FORMAT( | ||
| "Cannot drop column %s as it is part of the unique key for table ${qualifiedTargetTableName}", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not mix different string formatting ways like |
||
| removed_column_name | ||
| ); | ||
| ELSE | ||
| EXECUTE IMMEDIATE FORMAT( | ||
| "ALTER TABLE ${qualifiedTargetTableName} DROP COLUMN IF EXISTS %s", | ||
| removed_column_name | ||
| ); | ||
| END IF; | ||
| END FOR; | ||
|
|
||
| FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO | ||
| EXECUTE IMMEDIATE FORMAT( | ||
| "ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s", | ||
| column_info.column_name, | ||
| column_info.data_type | ||
| ); | ||
| END FOR; | ||
|
Comment on lines
+367
to
+373
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not duplicate add statements and put it into separate function |
||
| `; | ||
| break; | ||
| } | ||
| return sql; | ||
| } | ||
|
|
||
| private runFinalDmlSql( | ||
| table: dataform.ITable, | ||
| qualifiedTargetTableName: string, | ||
| dataTempTableName: string | ||
| ): string { | ||
| return [ | ||
| this.createIncrementalDataTempTableSql(table, dataTempTableName), | ||
| this.declareDataformColumnsListSql(), | ||
| this.executeMergeOrInsertSql(table, qualifiedTargetTableName, dataTempTableName) | ||
| ].join("\n"); | ||
| } | ||
|
|
||
| private createIncrementalDataTempTableSql(table: dataform.ITable, dataTempTableName: string): string { | ||
| return ` | ||
| CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS ( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a maintainer of this repo but: I've noticed this in the GCP Dataform compiled version of an EXTEND schema change incremental table, but for the life of me I cannot discern why this intermediate table is needed. It doesn't seem to be used for anything other than the MERGE statement, and the INSERT on line 420. Could the query be used directly in the MERGE, and in the INSERT? Materializing into an intermediate table increases the cost of the query, sometimes by quite a lot.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dataTempTableName is used to ensure the columns selected for the MERGE or INSERT operation exactly match the schema determined after potential ALTER TABLE operations, as reflected in the temp_table_columns variable. The final DML statement is constructed dynamically within EXECUTE IMMEDIATE using column lists derived from temp_table_columns (e.g., dataform_columns_list). To use these dynamic column lists in the SELECT part of the MERGE's USING clause or the INSERT's SELECT statement, the source data must be in a table format. Using the raw table.incrementalQuery directly would not allow dynamic column selection based on dataform_columns_list within the EXECUTE IMMEDIATE context.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey, we actually don't have a specific reason for doing it. It is clean up leftover from the when we were using store procedure inside the execute immediate. @SuchodolskiEdvin it might be worth running some tests on the engine directly with the generated queries with table.incrementalQuery instead of temp table. @spatel11 Thanks for pointing this out. Please feel free to open a feature request in our public tracker and we will take a look and plan this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, thank you: https://issuetracker.google.com/issues/493440483 |
||
| ${this.where(table.incrementalQuery || table.query, table.where)} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to have a separate subfunction to generate |
||
| );`; | ||
| } | ||
|
|
||
| private declareDataformColumnsListSql(): string { | ||
| return ` | ||
| DECLARE dataform_columns_list STRING; | ||
| SET dataform_columns_list = ( | ||
| SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '') | ||
| FROM UNNEST(temp_table_columns) | ||
| );`; | ||
| } | ||
|
|
||
| private executeMergeOrInsertSql( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it seems that you added a new logic of generating I think we should avoid duplicating this logic and different behaviour when incremental schema change is enabled / disabled. So let's consolidate them |
||
| table: dataform.ITable, | ||
| qualifiedTargetTableName: string, | ||
| dataTempTableName: string | ||
| ): string { | ||
| if (table.uniqueKey && table.uniqueKey.length > 0) { | ||
| const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and "); | ||
| const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter; | ||
| const mergeOnClauseWithFilter = updatePartitionFilter | ||
| ? `${mergeOnClause} and T.${updatePartitionFilter}` | ||
| : mergeOnClause; | ||
|
|
||
| return ` | ||
| DECLARE dataform_columns_merge STRING; | ||
| SET dataform_columns_merge = ( | ||
| SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '') | ||
| FROM UNNEST(temp_table_columns) | ||
| ); | ||
|
|
||
| IF ARRAY_LENGTH(temp_table_columns) > 0 THEN | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need these checks? |
||
| EXECUTE IMMEDIATE ( | ||
| "MERGE \`${qualifiedTargetTableName}\` T " || | ||
| "USING \`${dataTempTableName}\` S " || | ||
| "ON ${mergeOnClauseWithFilter} " || | ||
| "WHEN MATCHED THEN " || | ||
| " UPDATE SET " || dataform_columns_merge || " " || | ||
| "WHEN NOT MATCHED THEN " || | ||
| " INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")" | ||
| ); | ||
| END IF;`; | ||
| } else { | ||
| return ` | ||
| IF ARRAY_LENGTH(temp_table_columns) > 0 THEN | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's separate generations of merge / insert statements into separate functions |
||
| EXECUTE IMMEDIATE ( | ||
| "INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " || | ||
| "SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`" | ||
| ); | ||
| END IF;`; | ||
| } | ||
| } | ||
|
|
||
| private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string { | ||
| return ` | ||
| -- Cleanup temporary tables. | ||
| DROP TABLE IF EXISTS ${emptyTempTableName}; | ||
| DROP TABLE IF EXISTS ${dataTempTableName}; | ||
| `; | ||
| } | ||
|
|
||
| private incrementalSchemaChangeBody( | ||
| table: dataform.ITable, | ||
| qualifiedTargetTableName: string, | ||
| emptyTempTableName: string, | ||
| dataTempTableName: string, | ||
| shortEmptyTableName: string | ||
| ): string { | ||
| const statements: string[] = [ | ||
| this.inferSchemaSql(emptyTempTableName, table.incrementalQuery || table.query), | ||
| this.compareSchemasSql( | ||
| table.target.database, | ||
| table.target.schema, | ||
| table.target.name, | ||
| shortEmptyTableName | ||
| ), | ||
| this.applySchemaChangeStrategySql(table, qualifiedTargetTableName), | ||
| this.runFinalDmlSql(table, qualifiedTargetTableName, dataTempTableName), | ||
| this.cleanupSql(emptyTempTableName, dataTempTableName) | ||
| ]; | ||
|
|
||
| return statements.join("\n\n"); | ||
| } | ||
|
|
||
| private createOrReplace(table: dataform.ITable) { | ||
| const options = []; | ||
| if (table.bigquery && table.bigquery.partitionBy && table.bigquery.partitionExpirationDays) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this UUID to be cryptographically secure