Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,225 @@ def compute(self):
return res


class MSSQLTableMetricComputer(BaseTableMetricComputer):
"""MSSQL Table Metric Computer"""

def compute(self):
"""compute table metrics for MSSQL using sys DMVs"""
table_meta = cte(
self._build_query(
[
Column("object_id"),
Column("name").label("table_name"),
func.schema_name(Column("schema_id")).label("schema_name"),
Column("create_date"),
],
self._build_table("tables", "sys"),
Comment thread
TeddyCr marked this conversation as resolved.
)
)

row_count_cte = cte(
self._build_query(
[
Column("object_id"),
func.sum(Column("row_count")).cast(BigInteger).label("row_count"),
],
self._build_table("dm_db_partition_stats", "sys"),
[Column("index_id").in_([0, 1])],
).group_by(Column("object_id"))
)

size_cte = cte(
self._build_query(
[
Column("object_id"),
(func.sum(Column("reserved_page_count")) * 8192).label(
"size_bytes"
),
],
self._build_table("dm_db_partition_stats", "sys"),
).group_by(Column("object_id"))
)

columns = [
row_count_cte.c.row_count.label(ROW_COUNT),
size_cte.c.size_bytes.label(SIZE_IN_BYTES),
table_meta.c.create_date.label(CREATE_DATETIME),
*self._get_col_names_and_count(),
]

query = (
select(*columns)
.select_from(table_meta)
.join(
row_count_cte,
table_meta.c.object_id == row_count_cte.c.object_id,
)
.outerjoin(
size_cte,
table_meta.c.object_id == size_cte.c.object_id,
)
.where(
table_meta.c.schema_name == self.schema_name,
table_meta.c.table_name == self.table_name,
)
)

res = self.runner._session.execute(query).first()
if not res:
return None
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
return super().compute()
return res


class CockroachTableMetricComputer(BaseTableMetricComputer):
"""CockroachDB Table Metric Computer"""

def compute(self):
"""compute table metrics for CockroachDB using crdb_internal.table_row_statistics"""
nsp_subquery = (
select(Column("table_id"))
.select_from(Table("tables", MetaData(), schema="crdb_internal"))
.where(
Column("name") == self.table_name,
Column("schema_name") == self.schema_name,
Column("database_name") == self.database,
)
.correlate(None)
.scalar_subquery()
)

columns = [
func.max(Column("rowCount")).cast(BigInteger).label(ROW_COUNT),
func.sum(Column("avgSize")).label(SIZE_IN_BYTES),
*self._get_col_names_and_count(),
Comment on lines +576 to +579
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: CockroachDB SIZE_IN_BYTES returns avg row size, not table size

The CockroachDB implementation computes SIZE_IN_BYTES as func.sum(Column("avgSize")) from system.table_statistics. However, avgSize in CockroachDB's system.table_statistics is the average size of values per column. Each row in this table represents statistics for a single column.

Summing avgSize across all columns gives you the average row width (sum of average column widths), not the total table size in bytes. To compute total table size, you need to multiply by the row count:

columns = [
    func.max(Column("rowCount")).cast(BigInteger).label(ROW_COUNT),
    (func.sum(Column("avgSize")) * func.max(Column("rowCount"))).label(SIZE_IN_BYTES),
    *self._get_col_names_and_count(),
]

Alternatively, you could query crdb_internal.table_span_stats which has approximate_disk_bytes for direct disk usage.

Impact: For a table with 1M rows and 10 columns averaging 50 bytes each, this would report 500 bytes instead of ~500MB — off by orders of magnitude.

Was this helpful? React with 👍 / 👎

]

where_clause = [
Column("tableID") == nsp_subquery,
]

stats = self._build_table("table_statistics", "system")

query = self._build_query(
columns,
stats,
where_clause,
)

res = self.runner._session.execute(query).first()
if not res:
return None
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
return super().compute()
return res


class DB2TableMetricComputer(BaseTableMetricComputer):
"""DB2 Table Metric Computer"""

def compute(self):
"""compute table metrics for DB2 using SYSCAT.TABLES"""
columns = [
Column("CARD").cast(BigInteger).label(ROW_COUNT),
(Column("FPAGES") * 4096).label(SIZE_IN_BYTES),
Column("CREATE_TIME").label(CREATE_DATETIME),
*self._get_col_names_and_count(),
]

where_clause = [
func.upper(Column("TABSCHEMA")) == self.schema_name.upper(),
func.upper(Column("TABNAME")) == self.table_name.upper(),
]

query = self._build_query(
columns,
self._build_table("TABLES", "SYSCAT"),
where_clause,
)

res = self.runner._session.execute(query).first()
if not res:
return None
if (
res.rowCount is None
or res.rowCount < 0
or (res.rowCount == 0 and self._entity.tableType == TableType.View)
):
return super().compute()
return res


class VerticaTableMetricComputer(BaseTableMetricComputer):
"""Vertica Table Metric Computer"""

def compute(self):
"""compute table metrics for Vertica using v_monitor.projection_storage"""
columns = [
func.sum(Column("row_count")).label(ROW_COUNT),
func.sum(Column("used_bytes")).label(SIZE_IN_BYTES),
*self._get_col_names_and_count(),
]

where_clause = [
Column("anchor_table_schema") == self.schema_name,
Column("anchor_table_name") == self.table_name,
]

query = self._build_query(
columns,
self._build_table("projection_storage", "v_monitor"),
where_clause,
)

res = self.runner._session.execute(query).first()
if not res:
return None
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
return super().compute()
return res


class SAPHanaTableMetricComputer(BaseTableMetricComputer):
"""SAP HANA Table Metric Computer"""

def compute(self):
"""compute table metrics for SAP HANA using SYS.M_TABLES"""
columns = [
Column("RECORD_COUNT").label(ROW_COUNT),
Column("TABLE_SIZE").label(SIZE_IN_BYTES),
Column("CREATE_TIME").label(CREATE_DATETIME),
*self._get_col_names_and_count(),
]

where_clause = [
Column("SCHEMA_NAME") == self.schema_name,
Column("TABLE_NAME") == self.table_name,
]

query = self._build_query(
columns,
self._build_table("M_TABLES", "SYS"),
where_clause,
)

res = self.runner._session.execute(query).first()
if not res:
return None
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
return super().compute()
return res


class TableMetricComputer:
"""Table Metric Construct"""

Expand Down Expand Up @@ -554,3 +773,13 @@ def construct(self, dialect, **kwargs):
table_metric_computer_factory.register(Dialects.Oracle, OracleTableMetricComputer)
table_metric_computer_factory.register(Dialects.Snowflake, SnowflakeTableMetricComputer)
table_metric_computer_factory.register(Dialects.Postgres, PostgresTableMetricComputer)
table_metric_computer_factory.register(Dialects.MariaDB, MySQLTableMetricComputer)
table_metric_computer_factory.register(Dialects.SingleStore, MySQLTableMetricComputer)
table_metric_computer_factory.register(Dialects.StarRocks, MySQLTableMetricComputer)
table_metric_computer_factory.register(Dialects.Doris, MySQLTableMetricComputer)
table_metric_computer_factory.register(Dialects.MSSQL, MSSQLTableMetricComputer)
table_metric_computer_factory.register(Dialects.AzureSQL, MSSQLTableMetricComputer)
table_metric_computer_factory.register(Dialects.Cockroach, CockroachTableMetricComputer)
table_metric_computer_factory.register(Dialects.Db2, DB2TableMetricComputer)
table_metric_computer_factory.register(Dialects.Vertica, VerticaTableMetricComputer)
table_metric_computer_factory.register(Dialects.Hana, SAPHanaTableMetricComputer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Integration tests for CockroachTableMetricComputer against a real CockroachDB database.
"""

import sys
from unittest.mock import Mock

import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base
from testcontainers.cockroachdb import CockroachDBContainer

from metadata.generated.schema.entity.data.table import TableType
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.profiler.orm.functions.table_metric_computer import (
CockroachTableMetricComputer,
)
from metadata.profiler.processor.runner import QueryRunner

if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)


Base = declarative_base()


class MetricComputerTestTable(Base):
__tablename__ = "metric_computer_test"
__table_args__ = {"schema": "public"}
id = Column(Integer, primary_key=True)
name = Column(String(256))


class NonExistentModel(Base):
__tablename__ = "nonexistent_table_xyz"
__table_args__ = {"schema": "public"}
id = Column(Integer, primary_key=True)


@pytest.fixture(scope="module")
def crdb_engine():
container = CockroachDBContainer(image="cockroachdb/cockroach:v23.1.0")
with container as container:
engine = create_engine(container.get_connection_url())
engine.execute(
"CREATE TABLE IF NOT EXISTS public.metric_computer_test "
"(id INTEGER PRIMARY KEY, name VARCHAR(256))"
)
engine.execute(
"INSERT INTO public.metric_computer_test (id, name) "
"SELECT g, 'name_' || g::text FROM generate_series(1, 100) AS g"
)
engine.execute("ANALYZE metric_computer_test")
yield engine
engine.execute("DROP TABLE IF EXISTS public.metric_computer_test")
engine.dispose()


@pytest.fixture(scope="module")
def session(crdb_engine):
session = create_and_bind_session(crdb_engine)
yield session
session.close()


def _build_computer(session, model, table_type):
runner = QueryRunner(
session=session,
dataset=model,
raw_dataset=model,
)
entity = Mock()
entity.tableType = table_type
computer = CockroachTableMetricComputer(
runner=runner,
metrics=[],
conn_config=None,
entity=entity,
)
computer._set_table_and_schema_name()
return computer


class TestCockroachTableMetricComputer:
def test_compute_returns_row_count(self, session):
computer = _build_computer(session, MetricComputerTestTable, TableType.Regular)
result = computer.compute()
assert result is not None
assert result.rowCount == 100

def test_compute_returns_column_metadata(self, session):
computer = _build_computer(session, MetricComputerTestTable, TableType.Regular)
result = computer.compute()
assert result is not None
assert result.columnCount == 2
assert "id" in result.columnNames
assert "name" in result.columnNames
Loading
Loading