Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 150 additions & 126 deletions ingestion/src/metadata/profiler/source/fetcher/fetcher_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

import traceback
from abc import ABC, abstractmethod
from typing import Iterable, Iterator, Optional, cast
from typing import Dict, Iterable, Iterator, List, Optional, cast

from pydantic import BaseModel

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import TableType
Expand All @@ -25,6 +27,7 @@
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.settings.settings import Settings
from metadata.generated.schema.type.filterPattern import FilterPattern
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.status import Status
from metadata.ingestion.models.entity_interface import EntityInterfaceWithTags
Expand All @@ -37,14 +40,48 @@
from metadata.utils.db_utils import Table
from metadata.utils.filters import (
filter_by_classification,
filter_by_database,
filter_by_schema,
filter_by_table,
validate_regex,
)

FIELDS = ["tableProfilerConfig", "columns", "customMetrics", "tags"]


class RegexFilter(BaseModel):
regex: str
mode: str


def _combine_patterns(patterns: List[str]) -> str:
if len(patterns) == 1:
return patterns[0]
return "|".join(f"({p})" for p in patterns)


def _build_regex_from_filter(
filter_pattern: Optional[FilterPattern],
) -> Optional[RegexFilter]:
"""Build a RegexFilter from a FilterPattern for server-side filtering.

When both includes and excludes are set, includes take precedence.
Validates that all regex patterns compile before sending them to the server.
"""
if not filter_pattern:
return None
validate_regex(filter_pattern.includes)
validate_regex(filter_pattern.excludes)
if filter_pattern.includes:
return RegexFilter(
regex=_combine_patterns(filter_pattern.includes), mode="include"
)
if filter_pattern.excludes:
return RegexFilter(
regex=_combine_patterns(filter_pattern.excludes), mode="exclude"
)
return None
Comment thread
TeddyCr marked this conversation as resolved.
Comment on lines +72 to +82
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_regex_from_filter drops excludes when includes is set (returns an include-mode RegexFilter and ignores excludes). Previously, profiler filtering used filter_by* which applies excludes even when includes are present (i.e., filter out items matching excludes). This changes filtering semantics and can lead to entities being included that used to be excluded. Consider pushing the include regex to the API but still applying the excludes client-side (similar to how conflicting schema/table modes are deferred), or extending the request to support both include+exclude semantics.

Copilot uses AI. Check for mistakes.


class FetcherStrategy(ABC):
"""Fetcher strategy interface"""

Expand Down Expand Up @@ -114,84 +151,28 @@ def __init__(
status: Status,
) -> None:
super().__init__(config, metadata, global_profiler_config, status)
self.database_filter_pattern = _build_regex_from_filter(
self.source_config.databaseFilterPattern
)
self.schema_filter_pattern = _build_regex_from_filter(
self.source_config.schemaFilterPattern
)
self.table_filter_pattern = _build_regex_from_filter(
self.source_config.tableFilterPattern
)
self.source_config = cast(
EntityFilterConfigInterface, self.source_config
) # Satisfy typechecker

def _filter_databases(self, databases: Iterable[Database]) -> Iterable[Database]:
"""Filter databases based on the filter pattern

Args:
databases (Database): Database to filter

Returns:
bool
"""
filtered_databases = []

for database in databases:
database_name = database.name.root
if database.fullyQualifiedName and self.source_config.useFqnForFiltering:
database_name = database.fullyQualifiedName.root
if filter_by_database(
self.source_config.databaseFilterPattern, database_name
):
self.status.filter(
database_name,
f"Database pattern not allowed for database {database_name}",
)
continue
filtered_databases.append(database)

return filtered_databases

def _filter_schemas(self, table: Table) -> bool:
"""Filter tables based on the schema filter pattern

Args:
tables (List[Table]): Tables to filter

Returns:
List[Table]
"""

if not table.databaseSchema:
return False
schema_name = (
table.databaseSchema.fullyQualifiedName
if self.source_config.useFqnForFiltering
else table.databaseSchema.name
)
if schema_name and filter_by_schema(
self.source_config.schemaFilterPattern, schema_name
):
self.status.filter(
schema_name, f"Schema pattern not allowed for schema {schema_name}"
)
return True

return False

def _filter_tables(self, table: Table) -> bool:
"""Filter tables based on the table filter pattern

Args:
tables (Iterable[Table]):

Returns:
Iterable[Table]:
"""
table_name = table.name.root
if table.fullyQualifiedName and self.source_config.useFqnForFiltering:
table_name = table.fullyQualifiedName.root

if filter_by_table(self.source_config.tableFilterPattern, table_name):
self.status.filter(
table_name, f"Table pattern not allowed for table {table_name}"
)
return True

return False
def _build_database_params(self) -> Dict[str, str]:
params: Dict[str, str] = {"service": self.config.source.serviceName} # type: ignore
db_filter = self.database_filter_pattern
if db_filter:
params["databaseRegex"] = db_filter.regex
params["regexMode"] = db_filter.mode
if self.source_config.useFqnForFiltering:
params["regexFilterByFqn"] = "true"
return params

def _filter_views(self, table: Table) -> bool:
"""Filter the tables based on include views configuration"""
Expand All @@ -208,86 +189,129 @@ def _filter_views(self, table: Table) -> bool:

return False

def _filter_column_metrics_computation(self):
"""Filter"""

def _get_database_entities(self) -> Iterable[Database]:
"""Get database entities"""
if not self.config.source.serviceName:
raise ValueError("serviceName must be provided in the source configuration")

params = self._build_database_params()
databases = self.metadata.list_all_entities(
entity=Database,
params={"service": self.config.source.serviceName},
params=params,
)
if not databases:
raise ValueError(
f"No databases found for service {self.config.source.serviceName}"
)
databases = cast(Iterable[Database], databases)

if self.source_config.databaseFilterPattern:
databases = self._filter_databases(databases)
count = 0
for database in databases:
count += 1
yield database

if not databases:
if count == 0:
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
Comment on lines 209 to 212
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no databases are returned, this always raises an error saying "databaseFilterPattern returned 0 result" even when databaseFilterPattern is not configured (i.e., the service legitimately has no databases or the service filter is wrong). Consider emitting a different message when databaseFilterPattern is null vs when a configured pattern filtered everything out, so the error is actionable.

Suggested change
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
db_filter_pattern = self.source_config.databaseFilterPattern
if db_filter_pattern:
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {db_filter_pattern.includes}" # pylint: disable=line-too-long
f"\n\t- excludes: {db_filter_pattern.excludes}" # pylint: disable=line-too-long
)
raise ValueError(
"No databases were returned for the configured service. "
"Either the service has no databases, or the service configuration is incorrect."

Copilot uses AI. Check for mistakes.
Comment on lines 209 to 212
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no databases are returned, this now always raises the "databaseFilterPattern returned 0 result" error, even if no databaseFilterPattern was configured (e.g., the service is empty). That message becomes misleading and also suggests a filter pattern problem when the real issue is simply no databases for the service. Consider restoring the prior behavior: if a database filter pattern is configured and yields 0 results, raise this message; otherwise raise an error indicating no databases were found for the service.

Suggested change
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
db_filter_pattern = self.source_config.databaseFilterPattern
if db_filter_pattern:
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {db_filter_pattern.includes}"
f"\n\t- excludes: {db_filter_pattern.excludes}"
)
raise ValueError(
f"No databases were found for the service {self.config.source.serviceName}. "
"At least 1 database must be available for the profiler to run."

Copilot uses AI. Check for mistakes.
)

return cast(Iterable[Database], databases)
def _build_table_params(self, database: Database) -> Dict[str, str]:
params: Dict[str, str] = {
"service": self.config.source.serviceName, # type: ignore
"database": database.fullyQualifiedName.root, # type: ignore
}

def _filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]:
"""Filter entities based on the filter pattern
schema_filter = self.schema_filter_pattern
table_filter = self.table_filter_pattern

Args:
entities (Iterable[EntityInterfaceWithTags]): Entities to filter
conflicting_modes = (
schema_filter is not None
and table_filter is not None
and schema_filter.mode != table_filter.mode
)

Returns:
Iterable[EntityInterfaceWithTags]
"""
tables = [
table
for table in tables
if (
not self.source_config.schemaFilterPattern
or not self._filter_schemas(table)
regex_mode: Optional[str] = None
if schema_filter and (not conflicting_modes or schema_filter.mode == "include"):
params["databaseSchemaRegex"] = schema_filter.regex
regex_mode = schema_filter.mode

if table_filter and (not conflicting_modes or table_filter.mode == "include"):
params["tableRegex"] = table_filter.regex
regex_mode = table_filter.mode

if regex_mode:
params["regexMode"] = regex_mode
if self.source_config.useFqnForFiltering:
params["regexFilterByFqn"] = "true"

return params

def _has_conflicting_filter_modes(self) -> bool:
schema_filter = self.schema_filter_pattern
table_filter = self.table_filter_pattern
return (
schema_filter is not None
and table_filter is not None
and schema_filter.mode != table_filter.mode
)

def _filter_deferred_excludes(self, table: Table) -> bool:
"""Apply exclude filters that were deferred to client-side
because schema and table filters use conflicting modes."""
schema_filter = self.schema_filter_pattern
table_filter = self.table_filter_pattern

if schema_filter and schema_filter.mode == "exclude" and table.databaseSchema:
exclude_only = FilterPattern(
excludes=self.source_config.schemaFilterPattern.excludes
)
and (
not self.source_config.tableFilterPattern
or not self._filter_tables(table)
schema_name = (
table.databaseSchema.fullyQualifiedName
if self.source_config.useFqnForFiltering
else table.databaseSchema.name
)
and (
not self.source_config.classificationFilterPattern
or not self.filter_classifications(table)
if schema_name and filter_by_schema(exclude_only, schema_name):
self.status.filter(
schema_name,
f"Schema pattern not allowed for schema {schema_name}",
)
return True

if table_filter and table_filter.mode == "exclude":
exclude_only = FilterPattern(
excludes=self.source_config.tableFilterPattern.excludes
)
and not self._filter_views(table)
]
table_name = table.name.root
if table.fullyQualifiedName and self.source_config.useFqnForFiltering:
table_name = table.fullyQualifiedName.root
if filter_by_table(exclude_only, table_name):
self.status.filter(
table_name,
f"Table pattern not allowed for table {table_name}",
)
return True

return tables
return False

def _get_table_entities(self, database: Database) -> Iterable[Table]:
"""Given a database, get all table entities

Args:
database (Database): Database to get tables from

Returns:
Iterable[Table]
"""
"""Given a database, get all table entities"""
params = self._build_table_params(database)
tables = self.metadata.list_all_entities(
entity=Table,
fields=FIELDS,
params={
"service": self.config.source.serviceName,
"database": database.fullyQualifiedName.root, # type: ignore
}, # type: ignore
params=params,
)
tables = cast(Iterable[Table], tables)
tables = self._filter_entities(tables)

return cast(Iterable[Table], tables)
has_deferred = self._has_conflicting_filter_modes()

for table in tables:
if has_deferred and self._filter_deferred_excludes(table):
continue
if (
self.source_config.classificationFilterPattern
and self.filter_classifications(table)
):
continue
if self._filter_views(table):
continue
yield table

def fetch(self) -> Iterator[Either[ProfilerSourceAndEntity]]:
"""Fetch database entity"""
Expand Down
Loading
Loading