Skip to content
This repository was archived by the owner on Dec 1, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src/nested_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pandas as pd
import pyarrow as pa
from dask.dataframe.dask_expr._collection import new_collection
from dask.dataframe.dask_expr._expr import no_default as dsk_no_default
from nested_pandas.series.dtype import NestedDtype
from nested_pandas.series.packer import pack, pack_flat, pack_lists
from pandas._libs import lib
Expand Down Expand Up @@ -731,7 +732,7 @@ def sort_values(
meta=self._meta,
)

def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame:
def reduce(self, func, *args, meta=dsk_no_default, infer_nesting=True, **kwargs) -> NestedFrame:
"""
Takes a function and applies it to each top-level row of the NestedFrame.

Expand All @@ -751,7 +752,15 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame:
Positional arguments to pass to the function, the first *args should be the names of the
columns to apply the function to.
meta : dataframe or series-like, optional
The dask meta of the output.
The dask meta of the output. If not provided, dask will try to
infer the metadata. This may lead to unexpected results, so
providing meta is recommended.
infer_nesting : bool, default True
If True, the function will pack output columns into nested
structures based on column names adhering to a nested naming
scheme. E.g. "nested.b" and "nested.c" will be packed into a column
called "nested" with columns "b" and "c". If False, all outputs
will be returned as base columns.
kwargs : keyword arguments, optional
Keyword arguments to pass to the function.

Expand All @@ -773,6 +782,26 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame:
>>> '''reduce will return a NestedFrame with two columns'''
>>> return {"sum_col1": sum(col1), "sum_col2": sum(col2)}

When using nesting inference (infer_nesting=True), the output may
contain nested columns. In such cases, the meta should be provided with
the appropriate dtype for these columns. For example, the following
function, which produces a nested column "lc":

>>> def complex_output(flux):
>>> return {"max_flux": np.max(flux),
>>> "lc.flux_quantiles": np.quantile(flux, [0.1, 0.2, 0.3, 0.4, 0.5]),
>>> "lc.labels": [0.1, 0.2, 0.3, 0.4, 0.5]}

Would require the following meta:

>>> # create a NestedDtype for the nested column "lc"
>>> from nested_pandas.series.dtype import NestedDtype
>>> lc_dtype = NestedDtype(pa.struct([pa.field("flux_quantiles", pa.list_(pa.float64())),
>>> pa.field("labels", pa.list_(pa.float64()))]))
>>> # use the lc_dtype in meta creation
>>> result_meta = npd.NestedFrame({'max_flux':pd.Series([], dtype='float'),
>>> 'lc':pd.Series([], dtype=lc_dtype)})

"""

# Handle meta shorthands to produce nestedframe output
Expand All @@ -787,7 +816,9 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame:
# apply nested_pandas reduce via map_partitions
# wrap the partition in a npd.NestedFrame call for:
# https://github.com/lincc-frameworks/nested-dask/issues/21
return self.map_partitions(lambda x: npd.NestedFrame(x).reduce(func, *args, **kwargs), meta=meta)
return self.map_partitions(
lambda x: npd.NestedFrame(x).reduce(func, *args, infer_nesting=infer_nesting, **kwargs), meta=meta
)

def to_parquet(self, path, by_layer=True, **kwargs) -> None:
"""Creates parquet file(s) with the data of a NestedFrame, either
Expand Down
37 changes: 37 additions & 0 deletions tests/nested_dask/test_nestedframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,43 @@ def mean_arr(arr): # type: ignore
assert isinstance(reduced.compute(), npd.NestedFrame)


def test_reduce_output_inference():
"""test the extension of the reduce result nesting inference"""

ndd = generate_data(20, 20, npartitions=2, seed=1)

def complex_output(flux):
return {
"max_flux": np.max(flux),
"lc.flux_quantiles": np.quantile(flux, [0.1, 0.2, 0.3, 0.4, 0.5]),
"lc.labels": [0.1, 0.2, 0.3, 0.4, 0.5],
"meta.colors": ["green", "red", "blue"],
}

# this sucks
result_meta = npd.NestedFrame(
{
"max_flux": pd.Series([], dtype="float"),
"lc": pd.Series(
[],
dtype=NestedDtype(
pa.struct(
[
pa.field("flux_quantiles", pa.list_(pa.float64())),
pa.field("labels", pa.list_(pa.float64())),
]
)
),
),
"meta": pd.Series([], dtype=NestedDtype(pa.struct([pa.field("colors", pa.list_(pa.string()))]))),
}
)
result = ndd.reduce(complex_output, "nested.flux", infer_nesting=True, meta=result_meta)

assert list(result.dtypes) == list(result.compute().dtypes)
assert list(result.columns) == list(result.compute().columns)


def test_to_parquet_combined(test_dataset, tmp_path):
"""test to_parquet when saving all layers to a single directory"""

Expand Down