Skip to content

Commit

Permalink
remove aggregate_files
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Sep 23, 2024
1 parent 86ab5d5 commit 0144cd9
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 47 deletions.
15 changes: 3 additions & 12 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5209,16 +5209,13 @@ def read_parquet(
.. note::
Dask automatically resizes partitions to ensure that each partition is of
adequate size. The optimizer uses the ratio of selected columns to total
columns to squash multiple files into one partition. If the ``blocksize``
argument is specified,
columns to squash multiple files into one partition.
Additionally, the Optimizer uses a minimum size per partition (default 75MB)
to avoid too many small partitions. This configuration can be set with
>>> dask.config.set({"dataframe.parquet.minimum-partition-size": "100MB"})
To disable file aggregation, set ``aggregate_files=False`` explicitly.
.. note::
Specifying ``filesystem="arrow"`` leverages a complete reimplementation of
the Parquet reader that is solely based on PyArrow. It is significantly faster
Expand Down Expand Up @@ -5326,11 +5323,6 @@ def read_parquet(
WARNING: Passing a string argument to ``aggregate_files`` will result
in experimental behavior. This behavior may change in the future.
.. note::
If ``filesystem="arrow"`` is specified, ``aggregate_files`` will be
``True`` by default, and file aggregation will occur at optimization
time only.
Whether distinct file paths may be aggregated into the same output
partition. This parameter is only used when `split_row_groups` is set to
'infer', 'adaptive' or to an integer >1. A setting of True means that any
Expand Down Expand Up @@ -5447,9 +5439,9 @@ def read_parquet(
"blocksize is not supported when using the pyarrow filesystem "
"if calculate_divisions is set to True."
)
if aggregate_files not in (None, True, False):
if aggregate_files is not None:
raise NotImplementedError(
"aggregate_files must be bool or None when using the pyarrow filesystem."
"aggregate_files is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
Expand All @@ -5475,7 +5467,6 @@ def read_parquet(
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
blocksize=blocksize,
aggregate_files=aggregate_files,
_series=isinstance(columns, str),
)
)
Expand Down
14 changes: 0 additions & 14 deletions dask_expr/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,11 +814,6 @@ def _fusion_compression_factor(self):
len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001
)

def _tune_up(self, parent):
if self.aggregate_files is False:
return
return super()._tune_up(parent)


class ReadParquetPyarrowFS(ReadParquet):
_parameters = [
Expand All @@ -835,7 +830,6 @@ class ReadParquetPyarrowFS(ReadParquet):
"pyarrow_strings_enabled",
"kwargs",
"blocksize",
"aggregate_files",
"_partitions",
"_series",
"_dataset_info_cache",
Expand All @@ -853,7 +847,6 @@ class ReadParquetPyarrowFS(ReadParquet):
"pyarrow_strings_enabled": True,
"kwargs": None,
"blocksize": None,
"aggregate_files": None,
"_partitions": None,
"_series": False,
"_dataset_info_cache": None,
Expand All @@ -867,11 +860,6 @@ def _blocksize(self):
return None
return parse_bytes(self.blocksize)

@property
def _aggregate_files(self):
# Allow file aggregation by default
return self.aggregate_files is not False

@cached_property
def normalized_path(self):
return _normalize_and_strip_protocol(self.path)
Expand Down Expand Up @@ -1120,8 +1108,6 @@ def _tune_up(self, parent):
if isinstance(parent, SplitParquetIO):
return
return parent.substitute(self, SplitParquetIO(self))
if self._aggregate_files is False:
return
if self._fusion_compression_factor >= 1:
return
if isinstance(parent, FusedParquetIO):
Expand Down
21 changes: 0 additions & 21 deletions dask_expr/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,6 @@ def test_pyarrow_filesystem_blocksize(tmpdir):
assert_eq(df, pdf, check_index=False)


@pytest.mark.parametrize("aggregate_files", [True, False])
def test_pyarrow_filesystem_aggregate_files(tmpdir, aggregate_files):
df0 = from_pandas(
pd.DataFrame({c: range(0, 20) for c in "abcde"}),
npartitions=2,
)
path = tmpdir + "aggregate.parquet"
df0.to_parquet(path)
df = read_parquet(
path,
filesystem="pyarrow",
blocksize="1MiB",
aggregate_files=aggregate_files,
)

# Trigger "_tune_up" optimization
df = df.map_partitions(lambda x: x)
assert df.optimize().npartitions == 1 if aggregate_files else 2
assert_eq(df, df0, check_index=False, check_divisions=False)


@pytest.mark.parametrize("dtype_backend", ["pyarrow", "numpy_nullable", None])
def test_pyarrow_filesystem_dtype_backend(parquet_file, dtype_backend):
filesystem = fs.LocalFileSystem()
Expand Down

0 comments on commit 0144cd9

Please sign in to comment.