Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions dask_expr/_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,37 @@ def create_array_collection(expr):
# to infer that we want to create an array is the only way that is guaranteed
# to be a general solution.
# We can get rid of this when we have an Array expression
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import Blockwise

result = expr.optimize()
return new_dd_object(
result.__dask_graph__(), result._name, result._meta, result.divisions
)
dsk = result.__dask_graph__()
name = result._name
meta = result._meta
divisions = result.divisions
import dask.array as da

chunks = ((np.nan,) * (len(divisions) - 1),) + tuple((d,) for d in meta.shape[1:])
if len(chunks) > 1:
if isinstance(dsk, HighLevelGraph):
layer = dsk.layers[name]
else:
# dask-expr provides a dict only
layer = dsk
if isinstance(layer, Blockwise):
layer.new_axes["j"] = chunks[1][0]
layer.output_indices = layer.output_indices + ("j",)
else:
from dask._task_spec import Alias, Task

suffix = (0,) * (len(chunks) - 1)
for i in range(len(chunks[0])):
task = layer.get((name, i))
new_key = (name, i) + suffix
if isinstance(task, Task):
task = Alias(new_key, task.key)
layer[new_key] = task
return da.Array(dsk, name=name, chunks=chunks, dtype=meta.dtype)


@get_collection_type.register(np.ndarray)
Expand Down
59 changes: 13 additions & 46 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@
from dask.dataframe.core import (
_concat,
_convert_to_numeric,
_Frame,
_repr_data_series,
_sqrt_and_convert_to_timedelta,
check_divisions,
has_parallel_type,
is_arraylike,
is_dataframe_like,
is_series_like,
meta_warning,
new_dd_object,
)
from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty
from dask.dataframe.dispatch import (
get_parallel_type,
is_categorical_dtype,
make_meta,
meta_nonempty,
)
from dask.dataframe.multi import warn_dtype_mismatch
from dask.dataframe.utils import (
AttributeNotImplementedError,
Expand All @@ -52,6 +54,7 @@
derived_from,
get_default_shuffle_method,
get_meta_library,
is_arraylike,
key_split,
maybe_pluralize,
memory_repr,
Expand Down Expand Up @@ -1370,25 +1373,6 @@ def repartition(
Repartition(self, npartitions, divisions, force, partition_size, freq)
)

def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame:
"""Convert to a legacy dask-dataframe collection

Parameters
----------
optimize
Whether to optimize the underlying `Expr` object before conversion.
**optimize_kwargs
Key-word arguments to pass through to `optimize`.
"""
warnings.warn(
"to_legacy_dataframe is deprecated and will be removed in a future release. "
"The legacy implementation as a whole is deprecated and will be removed, making "
"this method unnecessary.",
FutureWarning,
)
df = self.optimize(**optimize_kwargs) if optimize else self
return new_dd_object(df.dask, df._name, df._meta, df.divisions)

def to_dask_array(
self, lengths=None, meta=None, optimize: bool = True, **optimize_kwargs
) -> Array:
Expand Down Expand Up @@ -5052,28 +5036,6 @@ def from_dict(
)


def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
"""Create a dask-expr collection from a legacy dask-dataframe collection

Parameters
----------
optimize
Whether to optimize the graph before conversion.
"""
warnings.warn(
"from_legacy_dataframe is deprecated and will be removed in a future release. "
"The legacy implementation as a whole is deprecated and will be removed, making "
"this method unnecessary.",
FutureWarning,
)
graph = ddf.dask
if optimize:
graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__())
return from_graph(
graph, ddf._meta, ddf.divisions, ddf.__dask_keys__(), key_split(ddf._name)
)


def from_dask_array(x, columns=None, index=None, meta=None):
"""Create a Dask DataFrame from a Dask Array.

Expand Down Expand Up @@ -5793,7 +5755,7 @@ def merge_asof(
del kwargs["on"]

for o in [left_on, right_on]:
if isinstance(o, _Frame):
if isinstance(o, FrameBase):
raise NotImplementedError(
"Dask collections not currently allowed in merge columns"
)
Expand Down Expand Up @@ -6544,3 +6506,8 @@ def _compute_partition_stats(
return (mins, maxes, lens)
else:
return (non_empty_mins, non_empty_maxes, lens)


@get_parallel_type.register(FrameBase)
def get_parallel_type_frame(o):
return get_parallel_type(o._meta)
3 changes: 1 addition & 2 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
is_dataframe_like,
is_index_like,
is_series_like,
make_meta,
pd_split,
safe_head,
total_mem_usage,
)
from dask.dataframe.dispatch import meta_nonempty
from dask.dataframe.dispatch import make_meta, meta_nonempty
from dask.dataframe.rolling import CombinedOutput, _head_timedelta, overlap_chunk
from dask.dataframe.shuffle import drop_overlap, get_overlap
from dask.dataframe.utils import (
Expand Down
2 changes: 1 addition & 1 deletion dask_expr/_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from dask._task_spec import Task
from dask.core import flatten
from dask.dataframe.core import (
GROUP_KEYS_DEFAULT,
_concat,
apply_and_enforce,
is_dataframe_like,
is_series_like,
)
from dask.dataframe.dispatch import concat, make_meta, meta_nonempty
from dask.dataframe.groupby import (
GROUP_KEYS_DEFAULT,
_agg_finalize,
_aggregate_docstring,
_apply_chunk,
Expand Down
3 changes: 1 addition & 2 deletions dask_expr/_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
is_dataframe_like,
is_index_like,
is_series_like,
make_meta,
meta_nonempty,
total_mem_usage,
)
from dask.dataframe.dispatch import make_meta, meta_nonempty
from dask.typing import no_default
from dask.utils import M, apply, funcname

Expand Down
4 changes: 2 additions & 2 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import tlz as toolz
from dask import compute
from dask._task_spec import Task, TaskRef
from dask.dataframe.core import _concat, make_meta
from dask.dataframe.dispatch import is_categorical_dtype
from dask.dataframe.core import _concat
from dask.dataframe.dispatch import is_categorical_dtype, make_meta
from dask.dataframe.shuffle import (
barrier,
collect,
Expand Down
3 changes: 2 additions & 1 deletion dask_expr/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from dask._task_spec import List, Task
from dask.dataframe import methods
from dask.dataframe._pyarrow import to_pyarrow_string
from dask.dataframe.core import apply_and_enforce, is_dataframe_like, make_meta
from dask.dataframe.core import apply_and_enforce, is_dataframe_like
from dask.dataframe.dispatch import make_meta
from dask.dataframe.io.io import _meta_from_array, sorted_division_locations
from dask.typing import Key
from dask.utils import funcname, is_series_like
Expand Down
22 changes: 1 addition & 21 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
from_array,
from_dask_array,
from_dict,
from_legacy_dataframe,
from_map,
from_pandas,
optimize,
read_csv,
read_parquet,
)
from dask_expr._expr import Expr, Replace
from dask_expr._expr import Replace
from dask_expr.io import FromArray, FromMap, ReadParquet, parquet
from dask_expr.tests._util import _backend_library

Expand Down Expand Up @@ -227,25 +226,6 @@ def test_parquet_complex_filters(tmpdir):
assert_eq(got.optimize(), expect)


@pytest.mark.parametrize("optimize", [True, False])
def test_from_legacy_dataframe(optimize):
ddf = dd.from_dict({"a": range(100)}, npartitions=10)
with pytest.warns(FutureWarning, match="is deprecated"):
df = from_legacy_dataframe(ddf, optimize=optimize)
assert isinstance(df.expr, Expr)
assert_eq(df, ddf)


@pytest.mark.parametrize("optimize", [True, False])
def test_to_legacy_dataframe(optimize):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
df = from_pandas(pdf, npartitions=2)
with pytest.warns(FutureWarning, match="is deprecated"):
ddf = df.to_legacy_dataframe(optimize=optimize)
assert isinstance(ddf, dd.core.DataFrame)
assert_eq(df, ddf)


@pytest.mark.parametrize("optimize", [True, False])
def test_to_dask_array(optimize):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
Expand Down
Loading