Skip to content

Commit

Permalink
Internal cleanup of P2P code (#1154)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Oct 25, 2024
1 parent da840e8 commit 9992c63
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
7 changes: 3 additions & 4 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,8 @@ def _lower(self):
return None

def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key
from distributed.shuffle._core import ShuffleId, barrier_key, p2p_barrier
from distributed.shuffle._merge import merge_unpack
from distributed.shuffle._shuffle import shuffle_barrier

dsk = {}
token_left = _tokenize_deterministic(
Expand Down Expand Up @@ -667,9 +666,9 @@ def _layer(self) -> dict:
self.right_index,
)

dsk[_barrier_key_left] = (shuffle_barrier, token_left, transfer_keys_left)
dsk[_barrier_key_left] = (p2p_barrier, token_left, transfer_keys_left)
dsk[_barrier_key_right] = (
shuffle_barrier,
p2p_barrier,
token_right,
transfer_keys_right,
)
Expand Down
10 changes: 3 additions & 7 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,8 @@ def _meta(self):
return self.frame._meta.drop(columns=self.partitioning_index)

def _layer(self):
from distributed.shuffle._shuffle import (
ShuffleId,
barrier_key,
shuffle_barrier,
shuffle_unpack,
)
from distributed.shuffle._core import p2p_barrier
from distributed.shuffle._shuffle import ShuffleId, barrier_key, shuffle_unpack

dsk = {}
token = self._name.split("-")[-1]
Expand Down Expand Up @@ -604,7 +600,7 @@ def _layer(self):
True,
)

dsk[_barrier_key] = (shuffle_barrier, token, transfer_keys)
dsk[_barrier_key] = (p2p_barrier, token, transfer_keys)

# TODO: Decompose p2p Into transfer/barrier + unpack
name = self._name
Expand Down

0 comments on commit 9992c63

Please sign in to comment.