Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions merlin/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
if isinstance(x, dd.DataFrame):
# If input is a dask_cudf collection, convert
# to a pandas-backed Dask collection
if hasattr(x, "to_backend"):
# Requires dask>=2023.1.1
return x.to_backend("pandas")
if cudf is None or not isinstance(x, dask_cudf.DataFrame):
# Already a Pandas-backed collection
return x
Expand All @@ -676,6 +679,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1):
return dd.from_pandas(_x, sort=False, npartitions=npartitions) if to_collection else _x
elif cudf and dask_cudf:
if isinstance(x, dd.DataFrame):
if hasattr(x, "to_backend"):
# Requires dask>=2023.1.1
return x.to_backend("cudf")
# If input is a Dask collection, convert to dask_cudf
if isinstance(x, dask_cudf.DataFrame):
# Already a cudf-backed Dask collection
Expand Down
24 changes: 21 additions & 3 deletions merlin/io/dataframe_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ def to_ddf(self, columns=None, cpu=None):
cpu = self.cpu if cpu is None else cpu

# Move data from gpu to cpu if necessary
_ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
if cpu:
_ddf = self._ddf.to_backend("pandas")
elif cpu is False:
_ddf = self._ddf.to_backend("cudf")
else:
_ddf = self._ddf
else:
_ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf

if isinstance(columns, list):
return _ddf[columns]
Expand All @@ -49,14 +58,22 @@ def to_ddf(self, columns=None, cpu=None):
def to_cpu(self):
if self.cpu:
return
self._ddf = self._move_ddf("cpu")
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
self._ddf = self._ddf.to_backend("pandas")
else:
self._ddf = self._move_ddf("cpu")
self.cpu = True
self.moved_collection = not self.moved_collection

def to_gpu(self):
if not self.cpu:
return
self._ddf = self._move_ddf("gpu")
if hasattr(self._ddf, "to_backend"):
# Requires dask>=2023.1.1
self._ddf = self._ddf.to_backend("cudf")
else:
self._ddf = self._move_ddf("gpu")
self.cpu = False
self.moved_collection = not self.moved_collection

Expand All @@ -66,6 +83,7 @@ def num_rows(self):

def _move_ddf(self, destination):
"""Move the collection between cpu and gpu memory."""
# TODO: Remove this method when we pin to dask>=2013.1.1
_ddf = self._ddf
if (
self.moved_collection
Expand Down