-
Notifications
You must be signed in to change notification settings - Fork 2.1k
MINOR: Implement stats metrics for all SQA sources #25725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6c7f4ac
c5c8246
876a5e6
458805c
d5b0ffe
5dbded1
8fc3af6
8444487
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -482,6 +482,225 @@ def compute(self): | |
| return res | ||
|
|
||
|
|
||
| class MSSQLTableMetricComputer(BaseTableMetricComputer): | ||
| """MSSQL Table Metric Computer""" | ||
|
|
||
| def compute(self): | ||
| """compute table metrics for MSSQL using sys DMVs""" | ||
| table_meta = cte( | ||
| self._build_query( | ||
| [ | ||
| Column("object_id"), | ||
| Column("name").label("table_name"), | ||
| func.schema_name(Column("schema_id")).label("schema_name"), | ||
| Column("create_date"), | ||
| ], | ||
| self._build_table("tables", "sys"), | ||
| ) | ||
| ) | ||
|
|
||
| row_count_cte = cte( | ||
| self._build_query( | ||
| [ | ||
| Column("object_id"), | ||
| func.sum(Column("row_count")).cast(BigInteger).label("row_count"), | ||
| ], | ||
| self._build_table("dm_db_partition_stats", "sys"), | ||
| [Column("index_id").in_([0, 1])], | ||
| ).group_by(Column("object_id")) | ||
| ) | ||
|
|
||
| size_cte = cte( | ||
| self._build_query( | ||
| [ | ||
| Column("object_id"), | ||
| (func.sum(Column("reserved_page_count")) * 8192).label( | ||
| "size_bytes" | ||
| ), | ||
| ], | ||
| self._build_table("dm_db_partition_stats", "sys"), | ||
| ).group_by(Column("object_id")) | ||
| ) | ||
|
|
||
| columns = [ | ||
| row_count_cte.c.row_count.label(ROW_COUNT), | ||
| size_cte.c.size_bytes.label(SIZE_IN_BYTES), | ||
| table_meta.c.create_date.label(CREATE_DATETIME), | ||
| *self._get_col_names_and_count(), | ||
| ] | ||
|
|
||
| query = ( | ||
| select(*columns) | ||
| .select_from(table_meta) | ||
| .join( | ||
| row_count_cte, | ||
| table_meta.c.object_id == row_count_cte.c.object_id, | ||
| ) | ||
| .outerjoin( | ||
| size_cte, | ||
| table_meta.c.object_id == size_cte.c.object_id, | ||
| ) | ||
| .where( | ||
| table_meta.c.schema_name == self.schema_name, | ||
| table_meta.c.table_name == self.table_name, | ||
| ) | ||
| ) | ||
|
|
||
| res = self.runner._session.execute(query).first() | ||
| if not res: | ||
| return None | ||
| if res.rowCount is None or ( | ||
| res.rowCount == 0 and self._entity.tableType == TableType.View | ||
| ): | ||
| return super().compute() | ||
| return res | ||
|
|
||
|
|
||
| class CockroachTableMetricComputer(BaseTableMetricComputer): | ||
| """CockroachDB Table Metric Computer""" | ||
|
|
||
| def compute(self): | ||
| """compute table metrics for CockroachDB using crdb_internal.table_row_statistics""" | ||
| nsp_subquery = ( | ||
| select(Column("table_id")) | ||
| .select_from(Table("tables", MetaData(), schema="crdb_internal")) | ||
| .where( | ||
| Column("name") == self.table_name, | ||
| Column("schema_name") == self.schema_name, | ||
| Column("database_name") == self.database, | ||
| ) | ||
| .correlate(None) | ||
| .scalar_subquery() | ||
| ) | ||
|
|
||
| columns = [ | ||
| func.max(Column("rowCount")).cast(BigInteger).label(ROW_COUNT), | ||
| func.sum(Column("avgSize")).label(SIZE_IN_BYTES), | ||
| *self._get_col_names_and_count(), | ||
|
Comment on lines
+576
to
+579
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| ] | ||
|
|
||
| where_clause = [ | ||
| Column("tableID") == nsp_subquery, | ||
| ] | ||
|
|
||
| stats = self._build_table("table_statistics", "system") | ||
|
|
||
| query = self._build_query( | ||
| columns, | ||
| stats, | ||
| where_clause, | ||
| ) | ||
|
|
||
| res = self.runner._session.execute(query).first() | ||
| if not res: | ||
| return None | ||
| if res.rowCount is None or ( | ||
| res.rowCount == 0 and self._entity.tableType == TableType.View | ||
| ): | ||
| return super().compute() | ||
| return res | ||
|
|
||
|
|
||
| class DB2TableMetricComputer(BaseTableMetricComputer): | ||
| """DB2 Table Metric Computer""" | ||
|
|
||
| def compute(self): | ||
| """compute table metrics for DB2 using SYSCAT.TABLES""" | ||
| columns = [ | ||
| Column("CARD").cast(BigInteger).label(ROW_COUNT), | ||
| (Column("FPAGES") * 4096).label(SIZE_IN_BYTES), | ||
| Column("CREATE_TIME").label(CREATE_DATETIME), | ||
| *self._get_col_names_and_count(), | ||
| ] | ||
|
|
||
| where_clause = [ | ||
| func.upper(Column("TABSCHEMA")) == self.schema_name.upper(), | ||
| func.upper(Column("TABNAME")) == self.table_name.upper(), | ||
| ] | ||
|
|
||
| query = self._build_query( | ||
| columns, | ||
| self._build_table("TABLES", "SYSCAT"), | ||
| where_clause, | ||
| ) | ||
|
|
||
| res = self.runner._session.execute(query).first() | ||
| if not res: | ||
| return None | ||
| if ( | ||
| res.rowCount is None | ||
| or res.rowCount < 0 | ||
| or (res.rowCount == 0 and self._entity.tableType == TableType.View) | ||
| ): | ||
| return super().compute() | ||
| return res | ||
|
|
||
|
|
||
| class VerticaTableMetricComputer(BaseTableMetricComputer): | ||
| """Vertica Table Metric Computer""" | ||
|
|
||
| def compute(self): | ||
| """compute table metrics for Vertica using v_monitor.projection_storage""" | ||
| columns = [ | ||
| func.sum(Column("row_count")).label(ROW_COUNT), | ||
| func.sum(Column("used_bytes")).label(SIZE_IN_BYTES), | ||
| *self._get_col_names_and_count(), | ||
| ] | ||
|
|
||
| where_clause = [ | ||
| Column("anchor_table_schema") == self.schema_name, | ||
| Column("anchor_table_name") == self.table_name, | ||
| ] | ||
|
|
||
| query = self._build_query( | ||
| columns, | ||
| self._build_table("projection_storage", "v_monitor"), | ||
| where_clause, | ||
| ) | ||
|
|
||
| res = self.runner._session.execute(query).first() | ||
| if not res: | ||
| return None | ||
| if res.rowCount is None or ( | ||
| res.rowCount == 0 and self._entity.tableType == TableType.View | ||
| ): | ||
| return super().compute() | ||
| return res | ||
|
|
||
|
|
||
| class SAPHanaTableMetricComputer(BaseTableMetricComputer): | ||
| """SAP HANA Table Metric Computer""" | ||
|
|
||
| def compute(self): | ||
| """compute table metrics for SAP HANA using SYS.M_TABLES""" | ||
| columns = [ | ||
| Column("RECORD_COUNT").label(ROW_COUNT), | ||
| Column("TABLE_SIZE").label(SIZE_IN_BYTES), | ||
| Column("CREATE_TIME").label(CREATE_DATETIME), | ||
| *self._get_col_names_and_count(), | ||
| ] | ||
|
|
||
| where_clause = [ | ||
| Column("SCHEMA_NAME") == self.schema_name, | ||
| Column("TABLE_NAME") == self.table_name, | ||
| ] | ||
|
|
||
| query = self._build_query( | ||
| columns, | ||
| self._build_table("M_TABLES", "SYS"), | ||
| where_clause, | ||
| ) | ||
|
|
||
| res = self.runner._session.execute(query).first() | ||
| if not res: | ||
| return None | ||
| if res.rowCount is None or ( | ||
| res.rowCount == 0 and self._entity.tableType == TableType.View | ||
| ): | ||
| return super().compute() | ||
| return res | ||
|
|
||
|
|
||
| class TableMetricComputer: | ||
| """Table Metric Construct""" | ||
|
|
||
|
|
@@ -554,3 +773,13 @@ def construct(self, dialect, **kwargs): | |
| table_metric_computer_factory.register(Dialects.Oracle, OracleTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Snowflake, SnowflakeTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Postgres, PostgresTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.MariaDB, MySQLTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.SingleStore, MySQLTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.StarRocks, MySQLTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Doris, MySQLTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.MSSQL, MSSQLTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.AzureSQL, MSSQLTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Cockroach, CockroachTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Db2, DB2TableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Vertica, VerticaTableMetricComputer) | ||
| table_metric_computer_factory.register(Dialects.Hana, SAPHanaTableMetricComputer) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| # Copyright 2025 Collate | ||
| # Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """ | ||
| Integration tests for CockroachTableMetricComputer against a real CockroachDB database. | ||
| """ | ||
|
|
||
| import sys | ||
| from unittest.mock import Mock | ||
|
|
||
| import pytest | ||
| from sqlalchemy import Column, Integer, String, create_engine | ||
| from sqlalchemy.orm import declarative_base | ||
| from testcontainers.cockroachdb import CockroachDBContainer | ||
|
|
||
| from metadata.generated.schema.entity.data.table import TableType | ||
| from metadata.ingestion.connections.session import create_and_bind_session | ||
| from metadata.profiler.orm.functions.table_metric_computer import ( | ||
| CockroachTableMetricComputer, | ||
| ) | ||
| from metadata.profiler.processor.runner import QueryRunner | ||
|
|
||
| if not sys.version_info >= (3, 9): | ||
| pytest.skip("requires python 3.9+", allow_module_level=True) | ||
|
|
||
|
|
||
| Base = declarative_base() | ||
|
|
||
|
|
||
| class MetricComputerTestTable(Base): | ||
| __tablename__ = "metric_computer_test" | ||
| __table_args__ = {"schema": "public"} | ||
| id = Column(Integer, primary_key=True) | ||
| name = Column(String(256)) | ||
|
|
||
|
|
||
| class NonExistentModel(Base): | ||
| __tablename__ = "nonexistent_table_xyz" | ||
| __table_args__ = {"schema": "public"} | ||
| id = Column(Integer, primary_key=True) | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def crdb_engine(): | ||
| container = CockroachDBContainer(image="cockroachdb/cockroach:v23.1.0") | ||
| with container as container: | ||
| engine = create_engine(container.get_connection_url()) | ||
| engine.execute( | ||
| "CREATE TABLE IF NOT EXISTS public.metric_computer_test " | ||
| "(id INTEGER PRIMARY KEY, name VARCHAR(256))" | ||
| ) | ||
| engine.execute( | ||
| "INSERT INTO public.metric_computer_test (id, name) " | ||
| "SELECT g, 'name_' || g::text FROM generate_series(1, 100) AS g" | ||
| ) | ||
| engine.execute("ANALYZE metric_computer_test") | ||
| yield engine | ||
| engine.execute("DROP TABLE IF EXISTS public.metric_computer_test") | ||
| engine.dispose() | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def session(crdb_engine): | ||
| session = create_and_bind_session(crdb_engine) | ||
| yield session | ||
| session.close() | ||
|
|
||
|
|
||
| def _build_computer(session, model, table_type): | ||
| runner = QueryRunner( | ||
| session=session, | ||
| dataset=model, | ||
| raw_dataset=model, | ||
| ) | ||
| entity = Mock() | ||
| entity.tableType = table_type | ||
| computer = CockroachTableMetricComputer( | ||
| runner=runner, | ||
| metrics=[], | ||
| conn_config=None, | ||
| entity=entity, | ||
| ) | ||
| computer._set_table_and_schema_name() | ||
| return computer | ||
|
|
||
|
|
||
| class TestCockroachTableMetricComputer: | ||
| def test_compute_returns_row_count(self, session): | ||
| computer = _build_computer(session, MetricComputerTestTable, TableType.Regular) | ||
| result = computer.compute() | ||
| assert result is not None | ||
| assert result.rowCount == 100 | ||
|
|
||
| def test_compute_returns_column_metadata(self, session): | ||
| computer = _build_computer(session, MetricComputerTestTable, TableType.Regular) | ||
| result = computer.compute() | ||
| assert result is not None | ||
| assert result.columnCount == 2 | ||
| assert "id" in result.columnNames | ||
| assert "name" in result.columnNames |
Uh oh!
There was an error while loading. Please reload this page.