Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/WIREUP: Support reused lanes for EP reconfiguration #10244

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
48 changes: 48 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,54 @@ ucp_lane_index_t ucp_ep_lookup_lane(ucp_ep_h ucp_ep, uct_ep_h uct_ep)
return UCP_NULL_LANE;
}

ucp_lane_index_t ucp_ep_find_wireup_ep_lane(ucp_ep_h ep)
{
ucp_lane_index_t lane;

for (lane = 0; lane < ucp_ep_num_lanes(ep); lane++) {
if (ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) {
return lane;
}
}

return UCP_NULL_LANE;
}

static ucp_lane_index_t
ucp_ep_get_reused_lane_source(ucp_ep_h ep, ucp_lane_index_t new_lane,
const ucp_lane_index_t *reuse_lane_map)
{
ucp_lane_index_t lane;

for (lane = 0; lane < ucp_ep_num_lanes(ep); lane++) {
if (reuse_lane_map[lane] == new_lane) {
return lane;
}
}

return UCP_NULL_LANE;
}

ucp_lane_index_t
ucp_ep_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key,
const ucp_lane_index_t *reuse_lane_map)
{
ucp_lane_index_t lane;

if (ucp_ep_has_cm_lane(ep)) {
return key->cm_lane;
}

for (lane = 0; lane < key->num_lanes; lane++) {
if (ucp_ep_get_reused_lane_source(ep, lane, reuse_lane_map) ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is O(n^2) complexity here, let's try to avoid it.

UCP_NULL_LANE) {
return lane;
}
}

return UCP_NULL_LANE;
}

static int ucp_ep_lane_is_dst_index_match(ucp_rsc_index_t dst_index1,
ucp_rsc_index_t dst_index2)
{
Expand Down
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,12 @@ void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1,
const unsigned *addr_indices,
ucp_lane_index_t *lane_map);

ucp_lane_index_t
ucp_ep_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key,
const ucp_lane_index_t *reuse_lane_map);

ucp_lane_index_t ucp_ep_find_wireup_ep_lane(ucp_ep_h ep);

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);

Expand Down
55 changes: 12 additions & 43 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1353,29 +1353,12 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps,
}
}



static unsigned
ucp_ep_num_reused_lanes(ucp_ep_h ep, const ucp_lane_index_t *reuse_lane_map)
{
unsigned num_reused = 0;
ucp_lane_index_t lane;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
num_reused += (reuse_lane_map[lane] != UCP_NULL_LANE);
}

return num_reused;
}

static int
ucp_wireup_check_is_reconfigurable(ucp_ep_h ep,
const ucp_ep_config_key_t *new_key,
const ucp_unpacked_address_t *remote_address,
const unsigned *addr_indices)
{
ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES];
const ucp_ep_config_key_t *old_key;
ucp_lane_index_t lane;

if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
Expand All @@ -1391,27 +1374,7 @@ ucp_wireup_check_is_reconfigurable(ucp_ep_h ep,
}
}

old_key = &ucp_ep_config(ep)->key;
ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address,
addr_indices, reuse_lane_map);

/* For now, reconfig is supported only if no lanes are reused */
return ucp_ep_num_reused_lanes(ep, reuse_lane_map) == 0;
}

static ucp_lane_index_t
ucp_wireup_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key,
const ucp_lane_index_t *reuse_lane_map)
{
if (ucp_ep_has_cm_lane(ep)) {
return key->cm_lane;
}

/* Just use first lane, as only non-reused lanes are allowed at the
* moment. */
ucs_assert(key->num_lanes > 0);
ucs_assert(ucp_ep_num_reused_lanes(ep, reuse_lane_map) == 0);
return 0;
return 1;
}

static ucs_status_t
Expand All @@ -1431,21 +1394,30 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
old_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, old_lane));
ucs_assert_always(old_wireup_ep != NULL);

/* Select CM/non-reused lane as new wireup lane */
new_wireup_lane = ucp_ep_find_non_reused_lane(ep, key, reuse_lane_map);

/* Set wireup EP for new configuration's wireup lane */
if (ucp_ep_has_cm_lane(ep)) {
/* Use existing EP from CM lane */
new_wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
ucs_assert(new_wireup_ep != NULL);
} else {
} else if (new_wireup_lane != UCP_NULL_LANE) {
/* Create new EP for non-CM flow */
status = ucp_wireup_ep_create(ep, &uct_ep);
if (status != UCS_OK) {
return status;
}

new_wireup_ep = ucp_wireup_ep(uct_ep);
} else {
/* Use existing EP from selected wireup_ep */
new_wireup_lane = ucp_ep_find_wireup_ep_lane(ep);
ucs_assert(new_wireup_lane != UCP_NULL_LANE);
new_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, new_wireup_lane));
}

ucs_assert(new_wireup_ep != NULL);

/* Get correct aux_rsc_index either from next_ep or aux_ep */
aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ?
ucp_ep_get_rsc_index(ep, old_lane) :
Expand All @@ -1469,9 +1441,6 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key,
uct_ep_destroy(&old_wireup_ep->super.super);
ucp_ep_set_lane(ep, old_lane, NULL);

/* Select CM/non-reused lane as new wireup lane */
new_wireup_lane = ucp_wireup_find_non_reused_lane(ep, key, reuse_lane_map);

new_uct_eps[new_wireup_lane] = &new_wireup_ep->super.super;
key->wireup_msg_lane = new_wireup_lane;
return UCS_OK;
Expand Down
Loading
Loading