Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dask_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
cache-environment-key: environment-${{ steps.date.outputs.date }}-0

- name: Install current main versions of dask
run: python -m pip install git+https://github.com/dask/dask
run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy
run: python -m pip install git+https://github.com/dask/dask


- name: Install current main versions of distributed
run: python -m pip install git+https://github.com/dask/distributed
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
cache-environment-key: environment-${{ steps.date.outputs.date }}-1

- name: Install current main versions of dask
run: python -m pip install git+https://github.com/dask/dask
run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy
run: python -m pip install git+https://github.com/dask/dask

if: ${{ matrix.environment-file == 'ci/environment.yml' }}

- name: Install current main versions of distributed
Expand Down
62 changes: 23 additions & 39 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5131,23 +5131,17 @@ def read_csv(
path,
*args,
header="infer",
dtype_backend=None,
storage_options=None,
**kwargs,
):
from dask_expr.io.csv import ReadCSV
from dask.dataframe.io.csv import read_csv as _read_csv

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
ReadCSV(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
dataframe_backend="pandas",
)
return _read_csv(
path,
*args,
header=header,
storage_options=storage_options,
**kwargs,
)


Expand All @@ -5156,23 +5150,18 @@ def read_table(
*args,
header="infer",
usecols=None,
dtype_backend=None,
storage_options=None,
**kwargs,
):
from dask_expr.io.csv import ReadTable
from dask.dataframe.io.csv import read_table as _read_table

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
ReadTable(
path,
columns=usecols,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
)
return _read_table(
path,
*args,
header=header,
storage_options=storage_options,
usecols=usecols,
**kwargs,
)


Expand All @@ -5181,23 +5170,18 @@ def read_fwf(
*args,
header="infer",
usecols=None,
dtype_backend=None,
storage_options=None,
**kwargs,
):
from dask_expr.io.csv import ReadFwf
from dask.dataframe.io.csv import read_fwf as _read_fwf

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
ReadFwf(
path,
columns=usecols,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
)
return _read_fwf(
path,
*args,
header=header,
storage_options=storage_options,
usecols=usecols,
**kwargs,
)


Expand Down
151 changes: 0 additions & 151 deletions dask_expr/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,3 @@
import functools
import operator

from dask._task_spec import Task
from dask.typing import Key

from dask_expr._expr import Projection
from dask_expr._util import _convert_to_list
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered


class ReadCSV(PartitionsFiltered, BlockwiseIO):
_parameters = [
"filename",
"columns",
"header",
"dtype_backend",
"_partitions",
"storage_options",
"kwargs",
"_series",
"dataframe_backend",
]
_defaults = {
"columns": None,
"header": "infer",
"dtype_backend": None,
"kwargs": None,
"_partitions": None,
"storage_options": None,
"_series": False,
"dataframe_backend": "pandas",
}
_absorb_projections = True

@functools.cached_property
def operation(self):
from dask.dataframe.io import read_csv

return read_csv

@functools.cached_property
def _ddf(self):
from dask import config

# Temporary hack to simplify logic
with config.set({"dataframe.backend": self.dataframe_backend}):
kwargs = (
{"dtype_backend": self.dtype_backend}
if self.dtype_backend is not None
else {}
)
if self.kwargs is not None:
kwargs.update(self.kwargs)

columns = _convert_to_list(self.operand("columns"))
if columns is None:
pass
elif "include_path_column" in self.kwargs:
flag = self.kwargs["include_path_column"]
if flag is True:
column_to_remove = "path"
elif isinstance(flag, str):
column_to_remove = flag
else:
column_to_remove = None

columns = [c for c in columns if c != column_to_remove]

if not columns:
meta = self.operation(
self.filename,
header=self.header,
storage_options=self.storage_options,
**kwargs,
)._meta
columns = [list(meta.columns)[0]]

usecols = kwargs.pop("usecols", None)
if usecols is not None and columns is not None:
columns = [col for col in columns if col in usecols]
elif usecols:
columns = usecols

return self.operation(
self.filename,
usecols=columns,
header=self.header,
storage_options=self.storage_options,
**kwargs,
)

@functools.cached_property
def _meta(self):
return self._ddf._meta

def _simplify_up(self, parent, dependents):
if isinstance(parent, Projection):
kwargs = self.kwargs
# int usecols are positional, so block projections
if kwargs.get("usecols", None) is not None and isinstance(
kwargs.get("usecols")[0], int
):
return
return super()._simplify_up(parent, dependents)

@functools.cached_property
def columns(self):
columns_operand = self.operand("columns")
if columns_operand is None:
try:
return list(self._ddf._meta.columns)
except AttributeError:
return []
else:
return _convert_to_list(columns_operand)

def _divisions(self):
return self._ddf.divisions

@functools.cached_property
def _tasks(self):
from dask._task_spec import convert_legacy_graph

return list(convert_legacy_graph(self._ddf.dask.to_dict()).values())

def _filtered_task(self, name: Key, index: int) -> Task:
if self._series:
return Task(name, operator.getitem, self._tasks[index], self.columns[0])
t = self._tasks[index]
if t.key != name:
return Task(name, lambda x: x, t)
return t


class ReadTable(ReadCSV):
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_table

return read_table


class ReadFwf(ReadCSV):
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_fwf

return read_fwf


def to_csv(
df,
filename,
Expand Down
4 changes: 2 additions & 2 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
read_parquet,
)
from dask_expr._expr import Expr, Replace
from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet, parquet
from dask_expr.io import FromArray, FromMap, ReadParquet, parquet
from dask_expr.tests._util import _backend_library

# Set DataFrame backend for this module
Expand Down Expand Up @@ -257,7 +257,7 @@ def test_to_dask_array(optimize):

@pytest.mark.parametrize(
"fmt,read_func,read_cls",
[("parquet", read_parquet, ReadParquet), ("csv", read_csv, ReadCSV)],
[("parquet", read_parquet, ReadParquet), ("csv", read_csv, FromMap)],
)
def test_combine_similar(tmpdir, fmt, read_func, read_cls):
pdf = pd.DataFrame(
Expand Down
Loading