Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e1c732a
added cached_dataset decorator to get_dataset
alexfurmenkov Apr 8, 2026
037e800
Merge branch 'main' into 1681-cache-dataset-build
alexfurmenkov Apr 9, 2026
b9a9b14
added per-builder dataset caching on rule validation.
alexfurmenkov Apr 9, 2026
c2b36b6
cache tests
alexfurmenkov Apr 10, 2026
68643b3
fix for tests -- RulesEngine cache is cleared before each test
alexfurmenkov Apr 12, 2026
96f8637
added fields for caching to BaseDatasetBuilder
alexfurmenkov Apr 13, 2026
d008108
remove comment
alexfurmenkov Apr 14, 2026
e439a89
removed unnecessary kwargs pop call
alexfurmenkov Apr 14, 2026
62f649c
test for ignored kwargs in split dataset build
alexfurmenkov Apr 15, 2026
252cb8b
added cached decorator to JSONataDatasetBuilder
alexfurmenkov Apr 16, 2026
c7c61fd
Merge branch 'main' into 1681-cache-dataset-build
alexfurmenkov Apr 20, 2026
00e8cf9
Merge branch 'main' into 1681-cache-dataset-build
SFJohnson24 Apr 21, 2026
81cd458
removed kwargs from get_dataset method
alexfurmenkov Apr 22, 2026
d49a046
Merge branch 'main' into 1681-cache-dataset-build
alexfurmenkov Apr 27, 2026
ce74db1
Merge branch 'main' into 1681-cache-dataset-build
alexfurmenkov Apr 28, 2026
9eb32be
Merge branch 'main' into 1681-cache-dataset-build
alexfurmenkov Apr 29, 2026
08f56fa
fix test
alexfurmenkov Apr 29, 2026
b810e82
Merge branch 'main' into 1681-cache-dataset-build
SFJohnson24 Apr 30, 2026
0b4251b
Merge branch '1681-cache-dataset-build' of https://github.com/cdisc-o…
SFJohnson24 Apr 30, 2026
35e75fb
Merge branch 'main' into 1681-cache-dataset-build
SFJohnson24 Apr 30, 2026
b03de55
removed params and unused drop_duplicates argument from get metadata
SFJohnson24 Apr 30, 2026
ef1f4da
Merge branch '1681-cache-dataset-build' of https://github.com/cdisc-o…
SFJohnson24 Apr 30, 2026
b505404
remove kwargs
SFJohnson24 Apr 30, 2026
7f2bab2
test
SFJohnson24 Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions cdisc_rules_engine/dataset_builders/base_dataset_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from cdisc_rules_engine.services.define_xml.define_xml_reader_factory import (
DefineXMLReaderFactory,
)
from cdisc_rules_engine.utilities.decorators import cached
from cdisc_rules_engine.utilities.sdtm_utilities import get_corresponding_datasets
from cdisc_rules_engine.utilities.sdtm_utilities import (
tag_source,
Expand Down Expand Up @@ -34,6 +35,7 @@ def __init__(
):
self.data_service = data_service
self.cache = cache_service
self.cache_service = cache_service
self.data_processor = data_processor
self.rule_processor = rule_processor
self.dataset_metadata = dataset_metadata
Expand All @@ -44,6 +46,14 @@ def __init__(
self.standard_substandard = standard_substandard
self.library_metadata = library_metadata
self.dataset_implementation = self.data_service.dataset_implementation
if isinstance(dataset_metadata, SDTMDatasetMetadata):
self.domain = (
f"SUPP{dataset_metadata.rdomain}"
if dataset_metadata.rdomain
else dataset_metadata.domain
)
self.dataset_name = dataset_metadata.name
self.name = self.__class__.__name__

@abstractmethod
def build(self) -> DatasetInterface:
Expand All @@ -67,7 +77,8 @@ def build_split_datasets(self, dataset_name: str, **kwargs) -> DatasetInterface:
finally:
self.dataset_metadata = original_dataset_metadata

def get_dataset(self, **kwargs):
@cached("get_dataset")
def get_dataset(self):
# If validating dataset content, ensure split datasets are handled.
if self.dataset_metadata.is_split:
# Handle split datasets for content checks.
Expand All @@ -77,15 +88,14 @@ def get_dataset(self, **kwargs):
datasets_metadata=get_corresponding_datasets(
self.data_service.get_datasets(), self.dataset_metadata
),
**kwargs,
)
else:
# single dataset. the most common case
dataset: DatasetInterface = self.build()
dataset = tag_source(dataset, self.dataset_metadata)
return dataset

def get_dataset_contents(self, **kwargs):
def get_dataset_contents(self):
# If validating dataset content, ensure split datasets are handled.
if self.dataset_metadata.is_split:
# Handle split datasets for content checks.
Expand All @@ -95,7 +105,6 @@ def get_dataset_contents(self, **kwargs):
datasets_metadata=get_corresponding_datasets(
self.data_service.get_datasets(), self.dataset_metadata
),
**kwargs,
)
else:
# single dataset. the most common case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def build_split_datasets(self, dataset_name, **kwargs):
"""
return self.data_service.get_dataset(dataset_name=dataset_name)

def get_dataset(self, **kwargs):
dataset = super().get_dataset(**kwargs)
def get_dataset(self):
dataset = super().get_dataset()
length = sum(
[
dataset.record_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@ def build(self):
"""

return self.dataset_implementation.from_records(
{ds.unsplit_name: ds.filename for ds in self.data_service.get_datasets()},
index=[0],
[{ds.unsplit_name: ds.filename for ds in self.data_service.get_datasets()}]
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _get_cached_dataset(self) -> dict[str, list[str]]:

return errlist

def get_dataset(self, **kwargs) -> DatasetInterface:
def get_dataset(self) -> DatasetInterface:
dataset = self._get_cached_dataset()
records = [
{key: dataset[key][i] for key in dataset}
Expand All @@ -56,10 +56,10 @@ def get_dataset(self, **kwargs) -> DatasetInterface:
row for row in records if row["dataset"] == self.dataset_metadata.name
]
if filtered:
result = self.dataset_implementation.from_records(filtered, **kwargs)
result = self.dataset_implementation.from_records(filtered)
else:
empty_row = {key: "" for key in self.dataset_template.keys()}
result = self.dataset_implementation.from_records([empty_row], **kwargs)
result = self.dataset_implementation.from_records([empty_row])
return tag_source(result, self.dataset_metadata)

def list_errors(self, tree: exceptions.ErrorTree, errlist: dict[str, list]):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from json import load
from cdisc_rules_engine.dataset_builders.base_dataset_builder import BaseDatasetBuilder
from cdisc_rules_engine.utilities.decorators import cached


def add_json_pointer_paths(node, path=""):
Expand All @@ -19,7 +20,8 @@ def add_json_pointer_paths(node, path=""):

class JSONataDatasetBuilder(BaseDatasetBuilder):

def get_dataset(self, **kwargs):
@cached("get_dataset")
def get_dataset(self):
if not self.dataset_metadata.full_path:
return None
with self.data_service.read_data(self.dataset_metadata.full_path) as fp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ def build(self):
"""
# Get basic variable metadata
variables_metadata = self.data_service.get_variables_metadata(
dataset_name=self.dataset_metadata.name,
drop_duplicates=True,
dataset_name=self.dataset_metadata.name
)

# Check if the rule requires variable_max_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ def build(self):
"""
data_contents_long_df = super().build()
variable_metadata = self.data_service.get_variables_metadata(
dataset_name=self.dataset_metadata.name,
drop_duplicates=True,
dataset_name=self.dataset_metadata.name
)
merged_df = data_contents_long_df.merge(
variable_metadata._data, how="left", on="variable_name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def build(self):
"""
variable_metadata: List[dict] = self.get_define_xml_variables_metadata()
content_metadata: DatasetInterface = self.data_service.get_variables_metadata(
dataset_name=self.dataset_metadata.name,
drop_duplicates=True,
dataset_name=self.dataset_metadata.name
)
define_metadata: DatasetInterface = self.dataset_implementation.from_records(
variable_metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ def build(self):
variable_metadata: List[dict] = self.get_define_xml_variables_metadata()
# get dataset metadata and execute the rule
content_metadata: DatasetInterface = self.data_service.get_variables_metadata(
dataset_name=self.dataset_metadata.name,
drop_duplicates=True,
dataset_name=self.dataset_metadata.name
)
define_metadata: DatasetInterface = self.dataset_implementation.from_records(
variable_metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ def build(self):
# get dataset metadata and execute the rule
content_variables_metadata: DatasetInterface = (
self.data_service.get_variables_metadata(
dataset_name=self.dataset_metadata.name,
drop_duplicates=True,
dataset_name=self.dataset_metadata.name
)
)
dataset_contents = self.get_dataset_contents()
Expand Down
3 changes: 1 addition & 2 deletions cdisc_rules_engine/interfaces/data_service_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_raw_dataset_metadata(
"""

@abstractmethod
def get_variables_metadata(self, dataset_name: str, **params) -> DatasetInterface:
def get_variables_metadata(self, dataset_name: str) -> DatasetInterface:
"""
Gets variables metadata of a dataset.
"""
Expand All @@ -71,7 +71,6 @@ def concat_split_datasets(
self,
func_to_call: Callable,
datasets_metadata: Iterable[DatasetMetadata],
**kwargs,
):
"""
Accepts a list of split dataset filenames,
Expand Down
4 changes: 2 additions & 2 deletions cdisc_rules_engine/models/dataset/dask_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ def from_dict(cls, data: dict, **kwargs):
return cls(dataframe)

@classmethod
def from_records(cls, data: List[dict], **kwargs):
data = pd.DataFrame.from_records(data, **kwargs)
def from_records(cls, data: List[dict]):
data = pd.DataFrame.from_records(data)
dataframe = dd.from_pandas(data, npartitions=DEFAULT_NUM_PARTITIONS)
return cls(dataframe)

Expand Down
2 changes: 1 addition & 1 deletion cdisc_rules_engine/models/dataset/dataset_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def from_dict(cls, data: dict, **kwargs):

@classmethod
@abstractmethod
def from_records(cls, data: List[dict], **kwargs):
def from_records(cls, data: List[dict]):
"""
Create the underlying dataset from provided list of records
"""
Expand Down
4 changes: 2 additions & 2 deletions cdisc_rules_engine/models/dataset/pandas_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def from_dict(cls, data: dict, **kwargs):
return cls(dataframe)

@classmethod
def from_records(cls, data: List[dict], **kwargs):
dataframe = pd.DataFrame.from_records(data, **kwargs)
def from_records(cls, data: List[dict]):
dataframe = pd.DataFrame.from_records(data)
return cls(dataframe)

def __getitem__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def concat_split_datasets(
self,
func_to_call: Callable,
datasets_metadata: Iterable[DatasetMetadata],
**kwargs,
) -> DatasetInterface:
"""
Accepts a list of split dataset filenames, asynchronously downloads
Expand All @@ -149,24 +148,19 @@ def concat_split_datasets(
func_to_call must accept dataset_name and kwargs
as input parameters and return pandas DataFrame.
"""
# pop drop_duplicates param at the beginning to avoid passing it to func_to_call
drop_duplicates: bool = kwargs.pop("drop_duplicates", False)

# download datasets asynchronously
datasets: Iterator[DatasetInterface] = self._async_get_datasets(
func_to_call,
dataset_names=[
dataset_metadata.name for dataset_metadata in datasets_metadata
],
**kwargs,
)
full_dataset = self.dataset_implementation()
for dataset, dataset_metadata in zip(datasets, datasets_metadata):
tagged_dataset = tag_source(dataset, dataset_metadata)
full_dataset = full_dataset.concat(tagged_dataset, ignore_index=True)

if drop_duplicates:
full_dataset = full_dataset.drop_duplicates()
return full_dataset

def check_filepath(self, dataset_names: List[str]) -> List:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _initialize_datasets_metadata(self, **kwargs) -> dict[str, SDTMDatasetMetada
for dataset in self.data
}

def get_variables_metadata(self, dataset_name: str, **params) -> PandasDataset:
def get_variables_metadata(self, dataset_name: str) -> PandasDataset:
metadata_to_return = {
"variable_name": [],
"variable_order_number": [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _initialize_datasets_metadata(self, **kwargs) -> dict[str, SDTMDatasetMetada
return result

@cached_dataset(DatasetTypes.VARIABLES_METADATA.value)
def get_variables_metadata(self, dataset_name: str, **params) -> DatasetInterface:
def get_variables_metadata(self, dataset_name: str) -> DatasetInterface:
"""
Gets dataset from blob storage and returns metadata of a certain variable.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def get_dataset(self, dataset_name: str, **params) -> DatasetInterface:
return df

@cached_dataset(DatasetTypes.VARIABLES_METADATA.value)
def get_variables_metadata(self, dataset_name: str, **params) -> DatasetInterface:
def get_variables_metadata(self, dataset_name: str) -> DatasetInterface:
"""
Gets dataset from blob storage and returns metadata of a certain variable.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def _initialize_datasets_metadata(self, **kwargs) -> dict[str, SDTMDatasetMetada
return result

@cached_dataset(DatasetTypes.VARIABLES_METADATA.value)
def get_variables_metadata(self, dataset_name: str, **params) -> DatasetInterface:
def get_variables_metadata(self, dataset_name: str) -> DatasetInterface:
"""
Gets dataset from blob storage and returns metadata of a certain variable.
"""
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/test_dataset_builders/test_jsonata_dataset_bilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from unittest.mock import MagicMock, mock_open, patch

from cdisc_rules_engine.dataset_builders.jsonata_dataset_builder import (
JSONataDatasetBuilder,
)


def test_get_dataset_uses_cache():
metadata = MagicMock()
metadata.full_path = "fake_path.json"

data_service = MagicMock()
data_service.read_data.return_value.__enter__.return_value = mock_open(
read_data='{"a": {"b": 1}}'
)()

cache_service = MagicMock()
cache_service.get.side_effect = [None, {"cached": True}]

builder = JSONataDatasetBuilder(
dataset_metadata=metadata,
data_service=data_service,
rule={},
cache_service=cache_service,
rule_processor=MagicMock(),
data_processor=MagicMock(),
define_xml_path=None,
standard="USDM",
standard_version="4.0",
standard_substandard=None,
)
builder.cache_service = cache_service

with patch(
"cdisc_rules_engine.dataset_builders.jsonata_dataset_builder.load",
return_value={"a": {"b": 1}},
) as mock_load:

result1 = builder.get_dataset()
result2 = builder.get_dataset()

assert result1 == {"_path": "", "a": {"_path": "/a", "b": 1}}

# second call gets from cache
assert result2 == {"cached": True}

assert mock_load.call_count == 1

assert cache_service.get.call_count == 2
assert cache_service.add.call_count == 1
17 changes: 14 additions & 3 deletions tests/unit/test_rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
from cdisc_rules_engine.models.dataset import PandasDataset


@pytest.fixture(autouse=True)
def clear_lru_cache():
RulesEngine(standard="sdtmig").cache.clear_all()


def test_get_schema():
schema = RulesEngine().get_schema()
assert "variables" in schema
Expand Down Expand Up @@ -1075,9 +1080,15 @@ def test_rule_with_domain_prefix_replacement(
{
"domain": "AE",
"dataset": "AE",
"errors": [],
"executionStatus": ExecutionStatus.SUCCESS.value,
"message": None,
"errors": [
{
"dataset": "AE",
"error": "Empty dataset",
"message": "Dataset skipped - Dataset is empty after preprocessing and operations. rule id=TEST1, dataset=AE",
}
],
"executionStatus": ExecutionStatus.SKIPPED.value,
"message": "Dataset skipped - Dataset is empty after preprocessing and operations. rule id=TEST1, dataset=AE",
"variables": [],
}
],
Expand Down
Loading
Loading