Skip to content

Commit 5704f33

Browse files
committed
feat(gooddata-pandas): add max_bytes to arrow path and improve logging
risk: low
1 parent cda2982 commit 5704f33

6 files changed

Lines changed: 214 additions & 25 deletions

File tree

packages/gooddata-pandas/src/gooddata_pandas/arrow_convertor.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# (C) 2026 GoodData Corporation
22
from __future__ import annotations
33

4+
import logging
45
from typing import Callable
56

67
import orjson
@@ -43,6 +44,8 @@
4344

4445
_REQUIRED_SCHEMA_KEYS = (_META_XTAB, _META_MODEL, _META_VIEW)
4546

47+
logger = logging.getLogger(__name__)
48+
4649

4750
def read_model_labels(table: pa.Table) -> dict:
4851
"""Return the ``labels`` dict from the Arrow table's ``x-gdc-model-v1`` schema metadata.
@@ -133,9 +136,14 @@ def _parse_schema_metadata(table: pa.Table) -> dict:
133136
raise ValueError(
134137
"Arrow table has no schema metadata. Expected GoodData metadata keys: " + ", ".join(_REQUIRED_SCHEMA_KEYS)
135138
)
136-
schema_meta = {
137-
k.decode(): orjson.loads(v) for k, v in table.schema.metadata.items() if k.decode() in _REQUIRED_SCHEMA_KEYS
138-
}
139+
schema_meta = {}
140+
for _k, _v in table.schema.metadata.items():
141+
try:
142+
_k_str = _k.decode()
143+
except UnicodeDecodeError:
144+
continue
145+
if _k_str in _REQUIRED_SCHEMA_KEYS:
146+
schema_meta[_k_str] = orjson.loads(_v)
139147
missing = [k for k in _REQUIRED_SCHEMA_KEYS if k not in schema_meta]
140148
if missing:
141149
raise ValueError(
@@ -242,10 +250,15 @@ def _build_inline_index(
242250
totals_meta = xtab_meta.get("totalsMetadata", {})
243251
total_ref_vals: list = [None] * table.num_rows
244252
if totals_meta:
245-
for field in table.schema:
246-
if field.name.startswith(_COL_TOTAL_REF_PREFIX):
247-
total_ref_vals = table.column(field.name).to_pylist()
248-
break
253+
total_ref_cols = [f.name for f in table.schema if f.name.startswith(_COL_TOTAL_REF_PREFIX)]
254+
if total_ref_cols:
255+
if len(total_ref_cols) > 1:
256+
logger.warning(
257+
"Arrow table has %d __total_ref* columns; only %r is used for aggregation names.",
258+
len(total_ref_cols),
259+
total_ref_cols[0],
260+
)
261+
total_ref_vals = table.column(total_ref_cols[0]).to_pylist()
249262

250263
# Precompute per-row aggregation name and kept-label set for total rows.
251264
agg_for_row: list[str | None] = [None] * table.num_rows
@@ -268,16 +281,17 @@ def _build_inline_index(
268281
values = table.column(lid).to_pylist()
269282
processed = []
270283
for i, v in enumerate(values):
271-
if row_types[i] != 0 and isinstance(v, str):
272-
if ref in kept_labels_for_row[i]:
273-
# Outer label kept as real attribute value in a subtotal row.
274-
processed.append(v)
275-
elif v == "":
276-
# Aggregated level left empty by the server — fill with agg name.
277-
processed.append(agg_for_row[i] if agg_for_row[i] else v)
284+
if row_types[i] != 0:
285+
if isinstance(v, str):
286+
if ref in kept_labels_for_row[i]:
287+
processed.append(v)
288+
elif v == "":
289+
processed.append(agg_for_row[i] if agg_for_row[i] else v)
290+
else:
291+
processed.append(v.upper())
278292
else:
279-
# Aggregation function marker (e.g. 'sum') — uppercase it.
280-
processed.append(v.upper())
293+
# Non-string value in a total row — replace with the aggregation name when available.
294+
processed.append(agg_for_row[i] if agg_for_row[i] is not None else v)
281295
else:
282296
processed.append(v)
283297
arrays.append(processed)
@@ -466,6 +480,11 @@ def _label_ids_in_dim(dim: dict) -> set:
466480
(dim for dim in execution_dims if col_ref_label_ids <= _label_ids_in_dim(dim)),
467481
{},
468482
)
483+
if not col_dim and execution_dims:
484+
logger.warning(
485+
"No execution dimension contains column label IDs %s; column_totals_indexes will be empty.",
486+
col_ref_label_ids,
487+
)
469488
else:
470489
col_dim = next(
471490
(dim for dim in execution_dims if any("measureGroupHeaders" in h for h in dim.get("headers", []))),
@@ -542,6 +561,11 @@ def _label_ids_in_dim(dim: dict) -> set:
542561
(dim for dim in execution_dims if ref_label_ids <= _label_ids_in_dim(dim)),
543562
{},
544563
)
564+
if not row_dim and execution_dims:
565+
logger.warning(
566+
"No execution dimension contains row label IDs %s; row_totals_indexes will be empty.",
567+
ref_label_ids,
568+
)
545569
else:
546570
# Metrics-only: the dimension containing measureGroupHeaders is the output-row dim.
547571
row_dim = next(

packages/gooddata-pandas/src/gooddata_pandas/arrow_types.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,12 @@ class ArrowConfig:
4747
custom_mapping is not provided.
4848
custom_mapping: Arrow type → pandas dtype mapping dict. Only used when
4949
types_mapper=TypesMapper.CUSTOM, ignored otherwise.
50+
max_bytes: Optional byte-size limit for the Arrow response body. When set,
51+
``read_result_arrow`` raises ``ResultSizeBytesLimitExceeded`` if the
52+
raw IPC payload exceeds this value before parsing begins.
5053
"""
5154

5255
self_destruct: bool = False
5356
types_mapper: TypesMapper = TypesMapper.DEFAULT
5457
custom_mapping: dict | None = field(default=None)
58+
max_bytes: int | None = field(default=None)

packages/gooddata-pandas/src/gooddata_pandas/data_access.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
)
1919
from gooddata_sdk.utils import IdObjType
2020

21-
try:
22-
from gooddata_pandas.arrow_convertor import build_metric_field_index, convert_label_values, read_model_labels
23-
except ImportError:
24-
pass # Only needed when use_arrow=True; callers guard with _ARROW_AVAILABLE checks
25-
2621
from gooddata_pandas.utils import (
2722
ColumnsDef,
2823
IndexDef,
@@ -34,6 +29,12 @@
3429
get_catalog_attributes_for_extract,
3530
)
3631

32+
_ARROW_IMPORT_ERROR: ImportError | None = None
33+
try:
34+
from gooddata_pandas.arrow_convertor import build_metric_field_index, convert_label_values, read_model_labels
35+
except ImportError as _e:
36+
_ARROW_IMPORT_ERROR = _e
37+
3738

3839
class ExecutionDefinitionBuilder:
3940
_DEFAULT_INDEX_NAME: str = "0"
@@ -429,6 +430,7 @@ def _extract_from_arrow(
429430
col_to_attr_idx: dict[str, int],
430431
col_to_metric_idx: dict[str, int],
431432
index_to_attr_idx: dict[str, int],
433+
max_bytes: int | None = None,
432434
) -> tuple[dict, dict]:
433435
"""
434436
Arrow-path extraction for indexed() / not_indexed().
@@ -440,7 +442,11 @@ def _extract_from_arrow(
440442
``pandas.Timestamp`` to match the behaviour of the non-Arrow path.
441443
Week and quarter values remain as strings (same as non-Arrow).
442444
"""
443-
table = execution.bare_exec_response.read_result_arrow()
445+
if _ARROW_IMPORT_ERROR is not None:
446+
raise ImportError(
447+
"pyarrow is required for Arrow support. Install it with: pip install gooddata-pandas[arrow]"
448+
) from _ARROW_IMPORT_ERROR
449+
table = execution.bare_exec_response.read_result_arrow(max_bytes=max_bytes)
444450
exec_def = execution.exec_def
445451

446452
if table.num_rows == 0:
@@ -478,6 +484,7 @@ def compute_and_extract(
478484
is_cancellable: bool = False,
479485
result_page_len: int | None = None,
480486
use_arrow: bool = False,
487+
max_bytes: int | None = None,
481488
) -> tuple[dict, dict]:
482489
"""
483490
Convenience function that computes and extracts data from the execution response.
@@ -496,6 +503,8 @@ def compute_and_extract(
496503
Defaults to 1000. Larger values can improve performance for large result sets.
497504
use_arrow (bool, optional): When True, fetches the result via the Arrow IPC binary
498505
endpoint in one shot instead of paginating through JSON. Requires pyarrow.
506+
max_bytes (Optional[int]): Maximum response body size in bytes for the Arrow path.
507+
Raises ResultSizeBytesLimitExceeded when exceeded. Ignored when use_arrow=False.
499508
500509
Returns:
501510
tuple: A tuple containing the following dictionaries:
@@ -529,6 +538,7 @@ def compute_and_extract(
529538
col_to_attr_idx,
530539
col_to_metric_idx,
531540
index_to_attr_idx,
541+
max_bytes=max_bytes,
532542
)
533543
elif not exec_def.has_attributes():
534544
return _extract_for_metrics_only(execution, cols, col_to_metric_idx), dict()

packages/gooddata-pandas/src/gooddata_pandas/dataframe.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ def indexed(
159159
is_cancellable=is_cancellable,
160160
result_page_len=result_page_len,
161161
use_arrow=use_arrow,
162+
max_bytes=self._arrow_config.max_bytes if use_arrow else None,
162163
)
163164

164165
_idx = make_pandas_index(index)
@@ -210,6 +211,7 @@ def not_indexed(
210211
is_cancellable=is_cancellable,
211212
result_page_len=result_page_len,
212213
use_arrow=use_arrow,
214+
max_bytes=self._arrow_config.max_bytes if use_arrow else None,
213215
)
214216

215217
return pandas.DataFrame(data=data)
@@ -539,7 +541,7 @@ def for_exec_def_arrow(
539541
on_execution_submitted(execution)
540542

541543
exec_response = execution.bare_exec_response
542-
table = exec_response.read_result_arrow()
544+
table = exec_response.read_result_arrow(max_bytes=self._arrow_config.max_bytes)
543545
return self._table_to_df_and_metadata(table, exec_response, label_overrides, grand_totals_position)
544546

545547
def for_arrow_table(
@@ -684,7 +686,7 @@ def for_exec_result_id(
684686
result_cache_metadata.execution_response, _check_type=False
685687
),
686688
)
687-
table = exec_response.read_result_arrow()
689+
table = exec_response.read_result_arrow(max_bytes=self._arrow_config.max_bytes)
688690
return self._table_to_df_and_metadata(table, exec_response, label_overrides, grand_totals_position)
689691

690692
return convert_execution_response_to_dataframe(

packages/gooddata-pandas/tests/dataframe/test_dataframe_for_exec_def_arrow.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,6 +1684,146 @@ def test_indexed_use_arrow_text_attr_unchanged() -> None:
16841684
assert all(isinstance(v, str) for v in df.index.tolist())
16851685

16861686

1687+
def test_indexed_use_arrow_empty_result_preserves_structure() -> None:
1688+
"""indexed(use_arrow=True) on a zero-row Arrow table returns empty DataFrame with correct names."""
1689+
import json as _json
1690+
1691+
model_meta = {
1692+
"labels": {"region": {"granularity": None, "labelTitle": "Region", "primaryLabelId": "region"}},
1693+
"requestedShape": {"metrics": ["revenue"]},
1694+
"metrics": {"revenue": {"title": "Revenue"}},
1695+
}
1696+
xtab_meta = {
1697+
"labelMetadata": {"l0": {"labelId": "region", "primaryLabelId": "region"}},
1698+
"computedShape": {"rows": [], "cols": [], "metrics": ["m0"]},
1699+
"totalsMetadata": {},
1700+
}
1701+
schema_meta = {
1702+
b"x-gdc-model-v1": _json.dumps(model_meta).encode(),
1703+
b"x-gdc-xtab-v1": _json.dumps(xtab_meta).encode(),
1704+
b"x-gdc-view-v1": _json.dumps({"isTransposed": False}).encode(),
1705+
}
1706+
gdc_metric = {b"gdc": _json.dumps({"type": "metric", "index": 0}).encode()}
1707+
schema = pa.schema(
1708+
[
1709+
pa.field("__row_type", pa.int8()),
1710+
pa.field("region", pa.string()),
1711+
pa.field("metric_group_0", pa.float64(), metadata=gdc_metric),
1712+
],
1713+
metadata=schema_meta,
1714+
)
1715+
empty_table = pa.table(
1716+
{
1717+
"__row_type": pa.array([], type=pa.int8()),
1718+
"region": pa.array([], type=pa.string()),
1719+
"metric_group_0": pa.array([], type=pa.float64()),
1720+
},
1721+
schema=schema,
1722+
)
1723+
1724+
columns = {"revenue": "metric/revenue"}
1725+
index_by = {"reg": "label/region"}
1726+
mock_sdk, _, _ = _mock_execution(empty_table, columns, index_by)
1727+
1728+
gdf = DataFrameFactory(mock_sdk, "workspace", use_arrow=True)
1729+
df = gdf.indexed(index_by=index_by, columns=columns)
1730+
1731+
assert len(df) == 0
1732+
assert list(df.columns) == ["revenue"]
1733+
assert df.index.name == "reg"
1734+
1735+
1736+
def test_extract_from_arrow_without_pyarrow_raises_import_error() -> None:
1737+
"""_extract_from_arrow raises ImportError (not NameError) when pyarrow is unavailable."""
1738+
import gooddata_pandas.data_access as _da
1739+
1740+
original = _da._ARROW_IMPORT_ERROR
1741+
try:
1742+
_da._ARROW_IMPORT_ERROR = ImportError("pyarrow not installed")
1743+
with pytest.raises(ImportError, match="pyarrow"):
1744+
_da._extract_from_arrow(MagicMock(), [], {}, {}, {})
1745+
finally:
1746+
_da._ARROW_IMPORT_ERROR = original
1747+
1748+
1749+
def test_parse_schema_metadata_non_utf8_key_is_skipped() -> None:
1750+
"""_parse_schema_metadata skips non-UTF-8 byte keys without raising UnicodeDecodeError."""
1751+
if "dim_r_m" not in _cases():
1752+
pytest.skip("fixture dim_r_m not available")
1753+
table, _, _ = _load_case("dim_r_m")
1754+
non_utf8_meta = {b"\xff\xfe": b"some value", **table.schema.metadata}
1755+
table_with_bad_key = table.replace_schema_metadata(non_utf8_meta)
1756+
# Should not raise despite the non-UTF-8 key.
1757+
df = convert_arrow_table_to_dataframe(table_with_bad_key)
1758+
assert df is not None
1759+
1760+
1761+
def test_build_inline_index_total_row_numeric_label_uses_agg_name() -> None:
1762+
"""Total rows with non-string (numeric/null) label values are replaced with the aggregation name."""
1763+
table = pa.table(
1764+
{
1765+
"__row_type": pa.array([0, 2], type=pa.int8()),
1766+
"year": pa.array([2023.0, None], type=pa.float64()),
1767+
# Data row has no total ref; total row refers to "t0" in totalsMetadata.
1768+
"__total_ref": pa.array([None, [0]], type=pa.list_(pa.int32())),
1769+
}
1770+
)
1771+
xtab_meta = {
1772+
"labelMetadata": {"l0": {"labelId": "year", "primaryLabelId": "year"}},
1773+
"totalsMetadata": {"t0": {"aggregation": "sum", "rowLabels": []}},
1774+
}
1775+
model_meta = {
1776+
"labels": {"year": {"labelTitle": "Year"}},
1777+
"requestedShape": {"metrics": []},
1778+
}
1779+
idx = _build_inline_index(
1780+
table,
1781+
row_label_refs=["l0"],
1782+
label_ref_to_id={"l0": "year"},
1783+
model_meta=model_meta,
1784+
xtab_meta=xtab_meta,
1785+
)
1786+
assert idx is not None
1787+
assert idx[0] == 2023.0
1788+
assert idx[1] == "SUM"
1789+
1790+
1791+
def test_arrow_config_max_bytes_forwarded_to_read_result_arrow() -> None:
1792+
"""ArrowConfig.max_bytes is passed through to read_result_arrow() by for_exec_def_arrow()."""
1793+
if "dim_r_m" not in _cases():
1794+
pytest.skip("fixture dim_r_m not available")
1795+
table, _, meta = _load_case("dim_r_m")
1796+
1797+
mock_exec = MagicMock()
1798+
mock_exec.bare_exec_response.read_result_arrow.return_value = table
1799+
mock_exec.bare_exec_response.dimensions = meta["dimensions"]
1800+
mock_sdk = MagicMock()
1801+
mock_sdk.compute.for_exec_def.return_value = mock_exec
1802+
1803+
from gooddata_sdk import ExecutionDefinition
1804+
1805+
gdf = DataFrameFactory(mock_sdk, "workspace", arrow_config=ArrowConfig(max_bytes=10_000_000))
1806+
gdf.for_exec_def_arrow(MagicMock(spec=ExecutionDefinition))
1807+
1808+
mock_exec.bare_exec_response.read_result_arrow.assert_called_once_with(max_bytes=10_000_000)
1809+
1810+
1811+
def test_arrow_config_max_bytes_raises_when_exceeded() -> None:
1812+
"""ResultSizeBytesLimitExceeded from read_result_arrow propagates out of for_exec_def_arrow()."""
1813+
from gooddata_sdk.compute.model.execution import ResultSizeBytesLimitExceeded
1814+
1815+
mock_exec = MagicMock()
1816+
mock_exec.bare_exec_response.read_result_arrow.side_effect = ResultSizeBytesLimitExceeded(100, 200)
1817+
mock_sdk = MagicMock()
1818+
mock_sdk.compute.for_exec_def.return_value = mock_exec
1819+
1820+
from gooddata_sdk import ExecutionDefinition
1821+
1822+
gdf = DataFrameFactory(mock_sdk, "workspace", arrow_config=ArrowConfig(max_bytes=100))
1823+
with pytest.raises(ResultSizeBytesLimitExceeded):
1824+
gdf.for_exec_def_arrow(MagicMock(spec=ExecutionDefinition))
1825+
1826+
16871827
def test_indexed_use_arrow_mixed_date_and_text_index() -> None:
16881828
"""indexed() with use_arrow=True: date attr → Timestamp, text attr → str in MultiIndex."""
16891829
n = 3

0 commit comments

Comments
 (0)