Skip to content

Commit

Permalink
Test concatenate
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Sep 23, 2024
1 parent 93fac57 commit de6edba
Showing 1 changed file with 39 additions and 2 deletions.
41 changes: 39 additions & 2 deletions dask_expr/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@

distributed = pytest.importorskip("distributed")

from distributed import Client, LocalCluster
from distributed import Client, LocalCluster, SchedulerPlugin
from distributed.shuffle._core import id_from_key
from distributed.utils_test import cleanup # noqa F401
from distributed.utils_test import client as c # noqa F401
from distributed.utils_test import gen_cluster
from distributed.utils_test import gen_cluster, loop, loop_in_thread # noqa F401

import dask_expr as dx

Expand Down Expand Up @@ -456,3 +457,39 @@ def test_respect_context_shuffle(df, pdf, func):
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
result = q.optimize(fuse=False)
assert len([x for x in result.walk() if isinstance(x, P2PShuffle)]) > 0


@pytest.mark.parametrize("concatenate", [True, False])
def test_compute_concatenates(loop, concatenate):
pdf = pd.DataFrame({"a": np.random.randint(1, 100, (100,)), "b": 1})
df = from_pandas(pdf, npartitions=10)

class Plugin(SchedulerPlugin):
def start(self, *args, **kwargs):
self.repartition_in_tasks = False

def update_graph(
self,
scheduler,
*,
client,
keys,
tasks,
annotations,
priority,
dependencies,
**kwargs,
):
for key in dependencies:
if not isinstance(key, tuple):
continue
group = key[0]
if not isinstance(group, str):
continue
self.repartition_in_tasks |= group.startswith("repartitiontofewer")

with Client(loop=loop) as c:
c.register_plugin(Plugin(), name="tracker")
df.compute(fuse=False, concatenate=concatenate)
plugin = c.cluster.scheduler.plugins["tracker"]
assert plugin.repartition_in_tasks is concatenate

0 comments on commit de6edba

Please sign in to comment.