Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 98 additions & 41 deletions xrspatial/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ def reclassify(agg: xr.DataArray,
attrs=agg.attrs)


def _run_quantile(data, k, module):
def _run_quantile(data, num_sample, k, module):
# num_sample ignored for in-memory backends
w = 100.0 / k
p = module.arange(w, 100 + w, w)

Expand All @@ -405,27 +406,47 @@ def _run_quantile(data, k, module):
return q


def _run_dask_cupy_quantile(data, k):
# Convert dask+cupy chunks to numpy one at a time via map_blocks,
# then use dask's streaming approximate percentile (no full materialization).
def _run_dask_quantile(data, num_sample, k):
# Avoid boolean fancy indexing (data[da.isfinite(data)]) which creates
# unknown dask chunk sizes. Use sampling via indexed access to avoid
# materialising the full array (#884).
w = 100.0 / k
p = np.arange(w, 100 + w, w)
if p[-1] > 100.0:
p[-1] = 100.0
clean = da.where(da.isinf(data), np.nan, data)
num_data = data.size
if num_sample is None or num_sample >= num_data:
num_sample = num_data
sample_idx = _generate_sample_indices(num_data, num_sample)
values = np.asarray(clean.ravel()[sample_idx].compute())
values = values[np.isfinite(values)]
q = np.percentile(values, p)
q = np.unique(q)
return q


def _run_dask_cupy_quantile(data, num_sample, k):
# Convert dask+cupy chunks to numpy, then same safe path as dask (#884).
data_cpu = data.map_blocks(cupy.asnumpy, dtype=data.dtype, meta=np.array(()))
return _run_quantile(data_cpu, k, da)
return _run_dask_quantile(data_cpu, num_sample, k)


def _quantile(agg, k):
def _quantile(agg, num_sample, k):
mapper = ArrayTypeFunctionMapping(
numpy_func=lambda *args: _run_quantile(*args, module=np),
dask_func=lambda *args: _run_quantile(*args, module=da),
dask_func=_run_dask_quantile,
cupy_func=lambda *args: _run_quantile(*args, module=cupy),
dask_cupy_func=_run_dask_cupy_quantile
)
out = mapper(agg)(agg.data, k)
out = mapper(agg)(agg.data, num_sample, k)
return out


@supports_dataset
def quantile(agg: xr.DataArray,
k: int = 4,
num_sample: Optional[int] = 20_000,
name: Optional[str] = 'quantile') -> xr.DataArray:
"""
Reclassifies data for array `agg` into new values based on quantile
Expand All @@ -438,6 +459,12 @@ def quantile(agg: xr.DataArray,
of values to be reclassified.
k : int, default=4
Number of quantiles to be produced.
num_sample : int or None, default=20000
Number of sample data points used to compute percentile
breakpoints. For dask-backed arrays the sample is drawn
lazily to avoid materialising the entire array into RAM.
``None`` means use all data (safe for numpy/cupy,
automatically capped for dask).
name : str, default='quantile'
Name of the output aggregate array.

Expand Down Expand Up @@ -489,7 +516,7 @@ def quantile(agg: xr.DataArray,
res: (10.0, 10.0)
"""

q = _quantile(agg, k)
q = _quantile(agg, num_sample, k)
k_q = q.shape[0]
if k_q < k:
print("Quantile Warning: Not enough unique values"
Expand Down Expand Up @@ -695,16 +722,11 @@ def _run_dask_natural_break(agg, num_sample, k):
max_data = float(da.nanmax(da.where(da.isinf(data), np.nan, data)).compute())

num_data = data.size
if num_sample is not None and num_sample < num_data:
# Sample lazily from dask array; only materialize the sample
sample_idx = _generate_sample_indices(num_data, num_sample)
sample_data_np = np.asarray(data.ravel()[sample_idx].compute())
bins, uvk = _compute_natural_break_bins(
sample_data_np, None, k, max_data)
else:
data_flat_np = np.asarray(data.ravel().compute())
bins, uvk = _compute_natural_break_bins(
data_flat_np, None, k, max_data)
if num_sample is None or num_sample >= num_data:
num_sample = num_data # cap: still uses indexed access, never .compute() all
sample_idx = _generate_sample_indices(num_data, num_sample)
sample_data_np = np.asarray(data.ravel()[sample_idx].compute())
bins, uvk = _compute_natural_break_bins(sample_data_np, None, k, max_data)

out = _bin(agg, bins, np.arange(uvk))
return out
Expand All @@ -717,17 +739,12 @@ def _run_dask_cupy_natural_break(agg, num_sample, k):
max_data = float(da.nanmax(da.where(da.isinf(data), np.nan, data)).compute().item())

num_data = data.size
if num_sample is not None and num_sample < num_data:
# Sample lazily from dask array; only materialize the sample
sample_idx = _generate_sample_indices(num_data, num_sample)
sample_data = data.ravel()[sample_idx].compute()
sample_data_np = cupy.asnumpy(sample_data)
bins, uvk = _compute_natural_break_bins(
sample_data_np, None, k, max_data)
else:
data_flat_np = cupy.asnumpy(data.ravel().compute())
bins, uvk = _compute_natural_break_bins(
data_flat_np, None, k, max_data)
if num_sample is None or num_sample >= num_data:
num_sample = num_data # cap: still uses indexed access, never .compute() all
sample_idx = _generate_sample_indices(num_data, num_sample)
sample_data = data.ravel()[sample_idx].compute()
sample_data_np = cupy.asnumpy(sample_data)
bins, uvk = _compute_natural_break_bins(sample_data_np, None, k, max_data)

out = _bin(agg, bins, np.arange(uvk))
return out
Expand Down Expand Up @@ -1109,20 +1126,39 @@ def head_tail_breaks(agg: xr.DataArray,
attrs=agg.attrs)


def _run_percentiles(data, pct, module):
def _run_percentiles(data, num_sample, pct, module):
# num_sample ignored for in-memory backends
q = module.percentile(data[module.isfinite(data)], pct)
q = module.unique(q)
return q


def _run_dask_cupy_percentiles(data, pct):
def _run_dask_percentiles(data, num_sample, pct):
# Avoid boolean fancy indexing (data[da.isfinite(data)]) which creates
# unknown dask chunk sizes. Use sampling via indexed access to avoid
# materialising the full array (#884).
clean = da.where(da.isinf(data), np.nan, data)
num_data = data.size
if num_sample is None or num_sample >= num_data:
num_sample = num_data
sample_idx = _generate_sample_indices(num_data, num_sample)
values = np.asarray(clean.ravel()[sample_idx].compute())
values = values[np.isfinite(values)]
q = np.percentile(values, pct)
q = np.unique(q)
return q


def _run_dask_cupy_percentiles(data, num_sample, pct):
# Convert dask+cupy chunks to numpy, then same safe path as dask (#884).
data_cpu = data.map_blocks(cupy.asnumpy, dtype=data.dtype, meta=np.array(()))
return _run_percentiles(data_cpu, pct, da)
return _run_dask_percentiles(data_cpu, num_sample, pct)


@supports_dataset
def percentiles(agg: xr.DataArray,
pct: Optional[List] = None,
num_sample: Optional[int] = 20_000,
name: Optional[str] = 'percentiles') -> xr.DataArray:
"""
Classify data based on percentile breakpoints.
Expand All @@ -1134,6 +1170,12 @@ def percentiles(agg: xr.DataArray,
of values to be classified.
pct : list of float, default=[1, 10, 50, 90, 99]
Percentile values to use as breakpoints.
num_sample : int or None, default=20000
Number of sample data points used to compute percentile
breakpoints. For dask-backed arrays the sample is drawn
lazily to avoid materialising the entire array into RAM.
``None`` means use all data (safe for numpy/cupy,
automatically capped for dask).
name : str, default='percentiles'
Name of output aggregate array.

Expand All @@ -1154,11 +1196,11 @@ def percentiles(agg: xr.DataArray,

mapper = ArrayTypeFunctionMapping(
numpy_func=lambda *args: _run_percentiles(*args, module=np),
dask_func=lambda *args: _run_percentiles(*args, module=da),
dask_func=_run_dask_percentiles,
cupy_func=lambda *args: _run_percentiles(*args, module=cupy),
dask_cupy_func=_run_dask_cupy_percentiles,
)
q = mapper(agg)(agg.data, pct)
q = mapper(agg)(agg.data, num_sample, pct)

# Materialize bin edges to numpy
if hasattr(q, 'compute'):
Expand Down Expand Up @@ -1204,7 +1246,8 @@ def _compute_maximum_break_bins(values_np, k):
return bins


def _run_maximum_breaks(agg, k, module):
def _run_maximum_breaks(agg, num_sample, k, module):
# num_sample ignored for in-memory backends
data = agg.data
if module == cupy:
finite = data[cupy.isfinite(data)]
Expand All @@ -1218,21 +1261,29 @@ def _run_maximum_breaks(agg, k, module):
return out


def _run_dask_maximum_breaks(agg, k):
def _run_dask_maximum_breaks(agg, num_sample, k):
data = agg.data
data_clean = da.where(da.isinf(data), np.nan, data)
values_np = np.asarray(data_clean.ravel().compute())
num_data = data.size
if num_sample is None or num_sample >= num_data:
num_sample = num_data
sample_idx = _generate_sample_indices(num_data, num_sample)
values_np = np.asarray(data_clean.ravel()[sample_idx].compute())
values_np = values_np[np.isfinite(values_np)]
bins = _compute_maximum_break_bins(values_np, k)
out = _bin(agg, bins, np.arange(len(bins)))
return out


def _run_dask_cupy_maximum_breaks(agg, k):
def _run_dask_cupy_maximum_breaks(agg, num_sample, k):
data = agg.data
data_clean = da.where(da.isinf(data), np.nan, data)
data_cpu = data_clean.map_blocks(cupy.asnumpy, dtype=data.dtype, meta=np.array(()))
values_np = np.asarray(data_cpu.ravel().compute())
num_data = data.size
if num_sample is None or num_sample >= num_data:
num_sample = num_data
sample_idx = _generate_sample_indices(num_data, num_sample)
values_np = np.asarray(data_cpu.ravel()[sample_idx].compute())
values_np = values_np[np.isfinite(values_np)]
bins = _compute_maximum_break_bins(values_np, k)
out = _bin(agg, bins, np.arange(len(bins)))
Expand All @@ -1242,6 +1293,7 @@ def _run_dask_cupy_maximum_breaks(agg, k):
@supports_dataset
def maximum_breaks(agg: xr.DataArray,
k: int = 5,
num_sample: Optional[int] = 20_000,
name: Optional[str] = 'maximum_breaks') -> xr.DataArray:
"""
Classify data using the Maximum Breaks algorithm.
Expand All @@ -1256,6 +1308,11 @@ def maximum_breaks(agg: xr.DataArray,
of values to be classified.
k : int, default=5
Number of classes to be produced.
num_sample : int or None, default=20000
Number of sample data points used to fit the model.
For dask-backed arrays the sample is drawn lazily to avoid
materialising the entire array into RAM. ``None`` means use
all data (safe for numpy/cupy, automatically capped for dask).
name : str, default='maximum_breaks'
Name of output aggregate array.

Expand All @@ -1277,7 +1334,7 @@ def maximum_breaks(agg: xr.DataArray,
cupy_func=lambda *args: _run_maximum_breaks(*args, module=cupy),
dask_cupy_func=_run_dask_cupy_maximum_breaks,
)
out = mapper(agg)(agg, k)
out = mapper(agg)(agg, num_sample, k)
return xr.DataArray(out,
name=name,
dims=agg.dims,
Expand Down
Loading