Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions internal/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,33 @@ SELECT
pg_catalog.pg_get_function_identity_arguments(
pg_proc.oid
) AS func_identity_arguments,
pg_catalog.pg_get_functiondef(pg_proc.oid) AS func_def
pg_catalog.pg_get_functiondef(pg_proc.oid) AS func_def,
(
-- Find composite types of a table or a view used by this function
SELECT
ARRAY_AGG(DISTINCT JSONB_BUILD_OBJECT(
'schema', depend_namespace.nspname::TEXT,
'name', depend_class.relname::TEXT,
'columns', ARRAY[]::TEXT[]
))
FROM pg_catalog.pg_depend AS depend
INNER JOIN
pg_catalog.pg_type AS depend_type
ON depend.refobjid = depend_type.oid
INNER JOIN
pg_catalog.pg_class AS depend_class
ON depend_type.typrelid = depend_class.oid
INNER JOIN
pg_catalog.pg_namespace AS depend_namespace
ON depend_class.relnamespace = depend_namespace.oid
AND depend_namespace.nspname NOT IN ('pg_catalog', 'information_schema')
AND depend_namespace.nspname !~ '^pg_toast'
AND depend_namespace.nspname !~ '^pg_temp'
WHERE
depend.classid = 'pg_proc'::REGCLASS
AND depend.objid = pg_proc.oid
AND depend.deptype = 'n'
)::TEXT [] AS table_dependencies
FROM pg_catalog.pg_proc
INNER JOIN
pg_catalog.pg_namespace AS proc_namespace
Expand Down Expand Up @@ -523,11 +549,11 @@ SELECT
)
))
FROM pg_catalog.pg_depend AS d
INNER JOIN pg_catalog.pg_rewrite AS r ON d.objid = r.oid
INNER JOIN pg_catalog.pg_rewrite AS r ON d.objid = r.oid AND r.ev_class = c.oid
INNER JOIN pg_catalog.pg_depend AS d2 ON r.oid = d2.objid
INNER JOIN
pg_catalog.pg_class AS dep_c
ON d2.refobjid = dep_c.oid AND dep_c.relkind IN ('r', 'p')
ON d2.refobjid = dep_c.oid AND dep_c.relkind IN ('r', 'p', 'v') AND dep_c.oid != c.oid
INNER JOIN
pg_catalog.pg_namespace AS dep_ns
ON dep_c.relnamespace = dep_ns.oid
Expand All @@ -536,6 +562,19 @@ SELECT
-- Instead, they must be unmarshalled as string arrays.
-- https://github.com/lib/pq/pull/466
WHERE d.refobjid = c.oid)::TEXT [] AS table_dependencies,
(SELECT ARRAY_AGG(DISTINCT
proc_ns.nspname || '.' || pg_proc.proname || '(' ||
pg_catalog.pg_get_function_identity_arguments(pg_proc.oid) || ')'
)
FROM pg_catalog.pg_depend AS fd
INNER JOIN pg_catalog.pg_rewrite AS fr ON fd.objid = fr.oid AND fr.ev_class = c.oid
INNER JOIN pg_catalog.pg_depend AS fd2 ON fr.oid = fd2.objid
INNER JOIN pg_catalog.pg_proc AS pg_proc ON fd2.refobjid = pg_proc.oid AND fd2.refclassid = 'pg_proc'::REGCLASS
INNER JOIN pg_catalog.pg_namespace AS proc_ns ON pg_proc.pronamespace = proc_ns.oid
WHERE fd.refobjid = c.oid
AND fd2.deptype = 'n'
AND proc_ns.nspname NOT IN ('pg_catalog', 'information_schema')
)::TEXT [] AS function_dependencies,
PG_GET_VIEWDEF(c.oid, true) AS view_definition
FROM pg_catalog.pg_class AS c
INNER JOIN pg_catalog.pg_namespace AS n ON c.relnamespace = n.oid
Expand Down Expand Up @@ -594,11 +633,11 @@ SELECT
)
))
FROM pg_catalog.pg_depend AS d
INNER JOIN pg_catalog.pg_rewrite AS r ON d.objid = r.oid
INNER JOIN pg_catalog.pg_rewrite AS r ON d.objid = r.oid AND r.ev_class = c.oid
INNER JOIN pg_catalog.pg_depend AS d2 ON r.oid = d2.objid
INNER JOIN
pg_catalog.pg_class AS dep_c
ON d2.refobjid = dep_c.oid AND dep_c.relkind IN ('r', 'p')
ON d2.refobjid = dep_c.oid AND dep_c.relkind IN ('r', 'p', 'v') AND dep_c.oid != c.oid
INNER JOIN
pg_catalog.pg_namespace AS dep_ns
ON dep_c.relnamespace = dep_ns.oid
Expand Down
50 changes: 45 additions & 5 deletions internal/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 92 additions & 1 deletion internal/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ type Function struct {
// can track the dependencies of the function (or not)
Language string
DependsOnFunctions []SchemaQualifiedName

// TableDependencies is a list of tables the function depends on.
TableDependencies []TableDependency
}

type Procedure struct {
Expand Down Expand Up @@ -531,6 +534,8 @@ type View struct {

// TableDependencies is a list of tables the view depends on.
TableDependencies []TableDependency
// DependsOnFunctions is a list of functions the view depends on.
DependsOnFunctions []SchemaQualifiedName
}

type MaterializedView struct {
Expand Down Expand Up @@ -1320,11 +1325,21 @@ func (s *schemaFetcher) buildFunction(ctx context.Context, rawFunction queries.G
return Function{}, fmt.Errorf("fetchDependsOnFunctions(%s): %w", rawFunction.Oid, err)
}

// Supplement pg_depend with text-based detection of composite type refs
// (%ROWTYPE, type[]) and table refs (FROM/JOIN) in function bodies.
tableDependencies, err := parseJSONTableDependencies(
append(rawFunction.TableDependencies, parseBodyTableDeps(rawFunction.FuncDef, rawFunction.FuncSchemaName)...),
)
if err != nil {
return Function{}, fmt.Errorf("parsing table dependencies JSON: %w", err)
}

return Function{
SchemaQualifiedName: buildProcName(rawFunction.FuncName, rawFunction.FuncIdentityArguments, rawFunction.FuncSchemaName),
FunctionDef: rawFunction.FuncDef,
Language: rawFunction.FuncLang,
DependsOnFunctions: dependsOnFunctions,
TableDependencies: tableDependencies,
}, nil
}

Expand Down Expand Up @@ -1508,7 +1523,8 @@ func (s *schemaFetcher) fetchViews(ctx context.Context) ([]View, error) {
ViewDefinition: v.ViewDefinition,
Options: options,

TableDependencies: tableDependencies,
TableDependencies: tableDependencies,
DependsOnFunctions: parseViewFunctionDeps(v.FunctionDependencies),
})
}

Expand Down Expand Up @@ -1583,6 +1599,81 @@ func parseJSONTableDependencies(vals []string) ([]TableDependency, error) {
return out, nil
}


var (
// Composite type patterns: schema.name%ROWTYPE and schema.name[]
bodyQualifiedCompositeRe = regexp.MustCompile(`(?i)\b(\w+)\.(\w+)(?:%ROWTYPE|\[\])`)
bodyUnqualifiedCompositeRe = regexp.MustCompile(`(?i)(?:^|[^\.\w])(\w+)(?:%ROWTYPE|\[\])`)
// Table reference patterns: FROM/JOIN schema.name
bodyQualifiedTableRefRe = regexp.MustCompile(`(?i)(?:FROM|JOIN)\s+(\w+)\.(\w+)\b`)
)

// parseBodyTableDeps scans a function definition for table/view references
// that pg_depend does not track. Detects:
// - schema.name%ROWTYPE and schema.name[] (plpgsql DECLARE)
// - unqualified name%ROWTYPE and name[] (assumes function's schema)
// - FROM/JOIN schema.name (SQL table references in function body)

// parseViewFunctionDeps parses function dependency strings from the view query
// (format: "schema.name(args)") into SchemaQualifiedNames matching function vertex IDs.
func parseViewFunctionDeps(deps []string) []SchemaQualifiedName {
var out []SchemaQualifiedName
for _, d := range deps {
// Format: "schema.name(args)"
dotIdx := strings.IndexByte(d, '.')
parenIdx := strings.IndexByte(d, '(')
if dotIdx < 0 || parenIdx < 0 || dotIdx >= parenIdx {
continue
}
schema := d[:dotIdx]
name := d[dotIdx+1 : parenIdx]
args := d[parenIdx+1 : len(d)-1] // strip parens
out = append(out, buildProcName(name, args, schema))
}
return out
}

func parseBodyTableDeps(funcDef, funcSchemaName string) []string {
seen := make(map[string]bool)
var out []string

add := func(schema, name string) {
key := schema + "." + name
if !seen[key] {
seen[key] = true
out = append(out, fmt.Sprintf(`{"schema": "%s", "name": "%s", "columns": []}`, schema, name))
}
}

// Schema-qualified composite types
for _, m := range bodyQualifiedCompositeRe.FindAllStringSubmatch(funcDef, -1) {
if m[1] != "pg_catalog" && m[1] != "information_schema" {
add(m[1], m[2])
}
}

// Unqualified composite types
for _, m := range bodyUnqualifiedCompositeRe.FindAllStringSubmatch(funcDef, -1) {
switch strings.ToUpper(m[1]) {
case "RECORD", "BOOLEAN", "INTEGER", "BIGINT", "TEXT", "NUMERIC",
"VOID", "INT4", "INT8", "FLOAT8", "TIMESTAMPTZ", "TIMESTAMP",
"DATE", "JSONB", "JSON", "BYTEA", "UUID", "SMALLINT", "REAL",
"DOUBLE", "CHAR", "VARCHAR", "INTERVAL", "OID", "REGCLASS":
continue
}
add(funcSchemaName, m[1])
}

// Schema-qualified FROM/JOIN references
for _, m := range bodyQualifiedTableRefRe.FindAllStringSubmatch(funcDef, -1) {
if m[1] != "pg_catalog" && m[1] != "information_schema" {
add(m[1], m[2])
}
}

return out
}

// buildProcName is used to build the schema qualified name for a proc (function, procedure), i.e., anything
// identified by a name AND its arguments.
func buildProcName(name, identityArguments, schemaName string) SchemaQualifiedName {
Expand Down
15 changes: 15 additions & 0 deletions pkg/diff/function_sql_vertex_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,24 @@ func (f *functionSQLVertexGenerator) GetAddAlterDependencies(newFunction, oldFun
deps = append(deps, mustRun(f.GetSQLVertexId(newFunction, diffTypeAddAlter)).after(buildFunctionVertexId(depFunction, diffTypeAddAlter)))
}

// Add/alter the function after the table it depends on has been added/altered.
for _, t := range newFunction.TableDependencies {
deps = append(deps, mustRun(f.GetSQLVertexId(newFunction, diffTypeAddAlter)).after(buildTableVertexId(t.SchemaQualifiedName, diffTypeAddAlter)))
}

if !cmp.Equal(oldFunction, schema.Function{}) {
// If the function is being altered:
// If the old version of the function calls other functions that are being deleted come, those deletions
// must come after the function is altered, so it is no longer dependent on those dropped functions
for _, depFunction := range oldFunction.DependsOnFunctions {
deps = append(deps, mustRun(f.GetSQLVertexId(newFunction, diffTypeAddAlter)).before(buildFunctionVertexId(depFunction, diffTypeDelete)))
}

// Alter the function before the table it used to depend on has been altered or deleted.
for _, t := range oldFunction.TableDependencies {
deps = append(deps, mustRun(f.GetSQLVertexId(newFunction, diffTypeAddAlter)).before(buildTableVertexId(t.SchemaQualifiedName, diffTypeAddAlter)))
deps = append(deps, mustRun(f.GetSQLVertexId(newFunction, diffTypeAddAlter)).before(buildTableVertexId(t.SchemaQualifiedName, diffTypeDelete)))
}
}

return deps, nil
Expand All @@ -104,5 +115,9 @@ func (f *functionSQLVertexGenerator) GetDeleteDependencies(function schema.Funct
for _, depFunction := range function.DependsOnFunctions {
deps = append(deps, mustRun(f.GetSQLVertexId(function, diffTypeDelete)).before(buildFunctionVertexId(depFunction, diffTypeDelete)))
}
for _, t := range function.TableDependencies {
deps = append(deps, mustRun(f.GetSQLVertexId(function, diffTypeDelete)).before(buildTableVertexId(t.SchemaQualifiedName, diffTypeAddAlter)))
deps = append(deps, mustRun(f.GetSQLVertexId(function, diffTypeDelete)).before(buildTableVertexId(t.SchemaQualifiedName, diffTypeDelete)))
}
return deps, nil
}
5 changes: 5 additions & 0 deletions pkg/diff/view_sql_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (vsg *viewSQLGenerator) Add(v schema.View) (partialSQLGraph, error) {
deps = append(deps, mustRun(addVertexId).after(buildTableVertexId(t.SchemaQualifiedName, diffTypeAddAlter)))
}

// Run after any functions the view calls are added/altered.
for _, f := range v.DependsOnFunctions {
deps = append(deps, mustRun(addVertexId).after(buildFunctionVertexId(f, diffTypeAddAlter)))
}

return partialSQLGraph{
vertices: []sqlVertex{{
id: addVertexId,
Expand Down