Skip to content

Conversation

@dcherian
Copy link
Collaborator

Adds flox/expr.py with expression-system compatible implementations:

  • dask_groupby_agg: map-reduce, cohorts, blockwise methods
  • dask_groupby_scan: cumsum, ffill, bfill with Blelloch algorithm
  • Expression classes: ExtractFromDictExpr, CollapseBlocksExpr, SubsetBlocksExpr

Uses Task/TaskRef/Alias from dask._task_spec for proper task specification. Dispatches based on dask.array.ARRAY_EXPR_ENABLED flag.

🤖 Generated with Claude Code

Adds flox/expr.py with expression-system compatible implementations:
- dask_groupby_agg: map-reduce, cohorts, blockwise methods
- dask_groupby_scan: cumsum, ffill, bfill with Blelloch algorithm
- Expression classes: ExtractFromDictExpr, CollapseBlocksExpr, SubsetBlocksExpr

Uses Task/TaskRef/Alias from dask._task_spec for proper task specification.
Dispatches based on dask.array.ARRAY_EXPR_ENABLED flag.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
groups_.append(cohort_index.values)

# Concatenate cohort results along the last axis
reduced = dask.array.concatenate(cohort_results, axis=-1)
Copy link
Collaborator Author

@dcherian dcherian Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other version of this code does an implicit concat in the _tree_reduce. cohort_results can be O(10_000) arrays long (#415). Should we use that optimization here?

that PR also optimizes _tree_reduce for very large arrays; perhaps Claude can upstream that to dask too ;)

Comment on lines +494 to +496
# If reverse_result is True, we handle it specially to avoid expression
# optimization issues (slicing scan results triggers problematic
# optimizations that push slices into cumreduction).
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a bug to fix?

@dcherian
Copy link
Collaborator Author

One thing we could chat about is how to handle rechunking heuristics. for example, with resampling problems, a tiny amount of rechunking can be very beneficial to avoiding communication. In https://github.com/xarray-contrib/flox/pull/380/changes, i do "local reasoning" with some heuristics. Is there a way to do global reasoning here?

For example flox could always set a "blockwise groupby" layer for resampling problems, and dask could query it for a menu of possible chunk schemes that work well for that layer. Perhaps that's too hard.

Here's an example of why I want something like this: https://discourse.pangeo.io/t/best-practice-advice-on-parallel-processing-a-suite-of-zarr-files-with-dask-and-xarray/5201/6?u=dcherian

Another example: https://discourse.pangeo.io/t/xarray-memoryerror-with-groupby-workloads/4273/16?u=dcherian

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants