diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 26e90aa14d1..e9f0a266055 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1838,6 +1838,40 @@ ucp_lane_index_t ucp_ep_lookup_lane(ucp_ep_h ucp_ep, uct_ep_h uct_ep) return UCP_NULL_LANE; } +ucp_lane_index_t ucp_ep_find_wireup_ep_lane(ucp_ep_h ep) +{ + ucp_lane_index_t lane; + + for (lane = 0; lane < ucp_ep_num_lanes(ep); lane++) { + if (ucp_wireup_ep_test(ucp_ep_get_lane(ep, lane))) { + return lane; + } + } + + return UCP_NULL_LANE; +} + +ucp_lane_index_t +ucp_ep_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key, + const ucp_lane_index_t *reuse_lane_map) +{ + ucp_lane_map_t lane_bitmap = {0}; + ucp_lane_index_t lane; + + if (ucp_ep_has_cm_lane(ep)) { + return key->cm_lane; + } + + for (lane = 0; lane < ucp_ep_num_lanes(ep); lane++) { + if (reuse_lane_map[lane] != UCP_NULL_LANE) { + lane_bitmap |= UCS_BIT(reuse_lane_map[lane]); + } + } + + lane = ucs_ffs64_safe(~lane_bitmap); + return (lane < key->num_lanes) ? lane : UCP_NULL_LANE; +} + static int ucp_ep_lane_is_dst_index_match(ucp_rsc_index_t dst_index1, ucp_rsc_index_t dst_index2) { diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 50ff07e6cf9..d21e2153ad9 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -756,6 +756,12 @@ void ucp_ep_config_lanes_intersect(const ucp_ep_config_key_t *key1, const unsigned *addr_indices, ucp_lane_index_t *lane_map); +ucp_lane_index_t +ucp_ep_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key, + const ucp_lane_index_t *reuse_lane_map); + +ucp_lane_index_t ucp_ep_find_wireup_ep_lane(ucp_ep_h ep); + int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 39ed0cd5957..c642cb4aefd 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -1353,29 +1353,12 @@ static void ucp_wireup_discard_uct_eps(ucp_ep_h ep, uct_ep_h *uct_eps, } } - - -static unsigned -ucp_ep_num_reused_lanes(ucp_ep_h ep, const ucp_lane_index_t *reuse_lane_map) -{ - unsigned num_reused = 0; - ucp_lane_index_t lane; - - for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { - num_reused += (reuse_lane_map[lane] != UCP_NULL_LANE); - } - - return num_reused; -} - static int ucp_wireup_check_is_reconfigurable(ucp_ep_h ep, const ucp_ep_config_key_t *new_key, const ucp_unpacked_address_t *remote_address, const unsigned *addr_indices) { - ucp_lane_index_t reuse_lane_map[UCP_MAX_LANES]; - const ucp_ep_config_key_t *old_key; ucp_lane_index_t lane; if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) || @@ -1391,27 +1374,7 @@ ucp_wireup_check_is_reconfigurable(ucp_ep_h ep, } } - old_key = &ucp_ep_config(ep)->key; - ucp_ep_config_lanes_intersect(old_key, new_key, ep, remote_address, - addr_indices, reuse_lane_map); - - /* For now, reconfig is supported only if no lanes are reused */ - return ucp_ep_num_reused_lanes(ep, reuse_lane_map) == 0; -} - -static ucp_lane_index_t -ucp_wireup_find_non_reused_lane(ucp_ep_h ep, const ucp_ep_config_key_t *key, - const ucp_lane_index_t *reuse_lane_map) -{ - if (ucp_ep_has_cm_lane(ep)) { - return key->cm_lane; - } - - /* Just use first lane, as only non-reused lanes are allowed at the - * moment. */ - ucs_assert(key->num_lanes > 0); - ucs_assert(ucp_ep_num_reused_lanes(ep, reuse_lane_map) == 0); - return 0; + return 1; } static ucs_status_t @@ -1431,12 +1394,14 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, old_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, old_lane)); ucs_assert_always(old_wireup_ep != NULL); + /* Select CM/non-reused lane as new wireup lane */ + new_wireup_lane = ucp_ep_find_non_reused_lane(ep, key, reuse_lane_map); + /* Set wireup EP for new configuration's wireup lane */ if (ucp_ep_has_cm_lane(ep)) { /* Use existing EP from CM lane */ new_wireup_ep = ucp_ep_get_cm_wireup_ep(ep); - ucs_assert(new_wireup_ep != NULL); - } else { + } else if (new_wireup_lane != UCP_NULL_LANE) { /* Create new EP for non-CM flow */ status = ucp_wireup_ep_create(ep, &uct_ep); if (status != UCS_OK) { @@ -1444,8 +1409,15 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, } new_wireup_ep = ucp_wireup_ep(uct_ep); + } else { + /* Use existing EP from selected wireup_ep */ + new_wireup_lane = ucp_ep_find_wireup_ep_lane(ep); + ucs_assert(new_wireup_lane != UCP_NULL_LANE); + new_wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, new_wireup_lane)); } + ucs_assert(new_wireup_ep != NULL); + /* Get correct aux_rsc_index either from next_ep or aux_ep */ aux_rsc_index = ucp_wireup_ep_is_next_ep_active(old_wireup_ep) ? ucp_ep_get_rsc_index(ep, old_lane) : @@ -1469,9 +1441,6 @@ ucp_wireup_replace_wireup_msg_lane(ucp_ep_h ep, ucp_ep_config_key_t *key, uct_ep_destroy(&old_wireup_ep->super.super); ucp_ep_set_lane(ep, old_lane, NULL); - /* Select CM/non-reused lane as new wireup lane */ - new_wireup_lane = ucp_wireup_find_non_reused_lane(ep, key, reuse_lane_map); - new_uct_eps[new_wireup_lane] = &new_wireup_ep->super.super; key->wireup_msg_lane = new_wireup_lane; return UCS_OK; diff --git a/test/gtest/ucp/test_ucp_ep_reconfig.cc b/test/gtest/ucp/test_ucp_ep_reconfig.cc index 5bedb9126d2..dc356930fe8 100644 --- a/test/gtest/ucp/test_ucp_ep_reconfig.cc +++ b/test/gtest/ucp/test_ucp_ep_reconfig.cc @@ -17,6 +17,7 @@ class test_ucp_ep_reconfig : public ucp_test { protected: using address_t = std::pair; using address_p = std::unique_ptr; + using limits = std::numeric_limits; class entity : public ucp_test_base::entity { public: @@ -33,21 +34,31 @@ class test_ucp_ep_reconfig : public ucp_test { const ucp_ep_params_t &ep_params, int ep_idx = 0, int do_set_ep = 1) override; - void verify_configuration(const entity &other) const; + void + verify_configuration(const entity &other, unsigned num_reused) const; bool is_reconfigured() const { return m_cfg_index != ep()->cfg_index; } + unsigned num_reused_lanes() const + { + return m_num_reused_lanes; + } + private: void store_config(); - ucp_tl_bitmap_t ep_tl_bitmap() const; + ucp_tl_bitmap_t + ep_tl_bitmap(unsigned max_num_devs = limits::max()) const; address_p get_address(const ucp_tl_bitmap_t &tl_bitmap) const; bool is_lane_connected(ucp_ep_h ep, ucp_lane_index_t lane_idx, const entity &other) const; + unsigned num_paths() const; + ucp_tl_bitmap_t reduced_tl_bitmap() const; - ucp_worker_cfg_index_t m_cfg_index = UCP_WORKER_CFG_INDEX_NULL; + ucp_worker_cfg_index_t m_cfg_index = UCP_WORKER_CFG_INDEX_NULL; + unsigned m_num_reused_lanes = 0; std::vector m_uct_eps; bool m_exclude_ifaces; }; @@ -92,15 +103,25 @@ class test_ucp_ep_reconfig : public ucp_test { if (sender().ucph()->num_tls <= 2) { UCS_TEST_SKIP_R("test requires at least 2 ifaces to work"); } + + ensure_reused_lanes_reconfigurable(); } static void get_test_variants(std::vector &variants) { add_variant_with_value(variants, UCP_FEATURE_TAG, 0, ""); + add_variant_with_value(variants, UCP_FEATURE_TAG, 1, "reuse"); } void run(bool bidirectional = false); void skip_non_p2p(); + bool has_bond_iface(); + void ensure_reused_lanes_reconfigurable(); + + bool reuse_lanes() const + { + return get_variant_value(); + } void send_message(const ucp_test_base::entity &e1, const ucp_test_base::entity &e2, const mem_buffer &sbuf, @@ -159,6 +180,19 @@ class test_ucp_ep_reconfig : public ucp_test { } }; +unsigned test_ucp_ep_reconfig::entity::num_paths() const +{ + auto lane = ucp_ep_config(ep())->key.rma_bw_lanes[0]; + + if (lane == UCP_NULL_LANE) { + return 1; + } + + auto rsc_idx = ucp_ep_get_rsc_index(ep(), lane); + auto attr = ucp_worker_iface_get_attr(worker(), rsc_idx); + return attr->dev_num_paths; +} + void test_ucp_ep_reconfig::entity::store_config() { for (ucp_lane_index_t lane = 0; lane < ucp_ep_num_lanes(ep()); ++lane) { @@ -173,36 +207,57 @@ void test_ucp_ep_reconfig::entity::store_config() } m_cfg_index = ep()->cfg_index; + + /* Calculate number of reused lanes */ + auto num_reused = (ucp_ep_num_lanes(ep()) / 2) / num_paths(); + auto test = static_cast(m_test); + m_num_reused_lanes = test->reuse_lanes() ? num_reused : 0; } -ucp_tl_bitmap_t test_ucp_ep_reconfig::entity::ep_tl_bitmap() const +ucp_tl_bitmap_t +test_ucp_ep_reconfig::entity::ep_tl_bitmap(unsigned max_num_devs) const { - if (ep() == NULL) { - return UCS_STATIC_BITMAP_ZERO_INITIALIZER; - } - ucp_tl_bitmap_t tl_bitmap = UCS_STATIC_BITMAP_ZERO_INITIALIZER; - for (ucp_lane_index_t lane = 0; lane < ucp_ep_num_lanes(ep()); ++lane) { + unsigned dev_count = 0; + + for (auto lane = 0; lane < ucp_ep_num_lanes(ep()); ++lane) { + if (ucp_ep_get_path_index(ep(), lane) > 0) { + continue; + } + + if (dev_count++ >= max_num_devs) { + break; + } + UCS_STATIC_BITMAP_SET(&tl_bitmap, ucp_ep_get_rsc_index(ep(), lane)); } return tl_bitmap; } +ucp_tl_bitmap_t test_ucp_ep_reconfig::entity::reduced_tl_bitmap() const +{ + if ((ep() == NULL) || !m_exclude_ifaces) { + return ucp_tl_bitmap_max; + } + + auto reused_bitmap = ep_tl_bitmap(num_reused_lanes()); + const auto negative_bitmap = UCS_STATIC_BITMAP_NOT(reused_bitmap); + + return UCS_STATIC_BITMAP_NOT(UCS_STATIC_BITMAP_AND(ep_tl_bitmap(), + negative_bitmap)); +} + void test_ucp_ep_reconfig::entity::connect(const ucp_test_base::entity *other, const ucp_ep_params_t &ep_params, int ep_idx, int do_set_ep) { - auto r_other = static_cast(other); - auto worker_addr = r_other->get_address(ucp_tl_bitmap_max); - ucp_tl_bitmap_t tl_bitmap; + auto r_other = static_cast(other); + auto worker_addr = r_other->get_address(ucp_tl_bitmap_max); + const ucp_tl_bitmap_t tl_bitmap = r_other->reduced_tl_bitmap(); unsigned addr_indices[UCP_MAX_LANES]; ucp_ep_h ucp_ep; - tl_bitmap = m_exclude_ifaces ? - UCS_STATIC_BITMAP_NOT(r_other->ep_tl_bitmap()) : - ucp_tl_bitmap_max; - UCS_ASYNC_BLOCK(&worker()->async); ASSERT_UCS_OK(ucp_ep_create_to_worker_addr(worker(), &tl_bitmap, &worker_addr->second, @@ -248,7 +303,7 @@ bool test_ucp_ep_reconfig::entity::is_lane_connected(ucp_ep_h ep, } void test_ucp_ep_reconfig::entity::verify_configuration( - const entity &other) const + const entity &other, unsigned num_reused) const { unsigned reused = 0; const ucp_lane_index_t num_lanes = ucp_ep_num_lanes(ep()); @@ -257,13 +312,18 @@ void test_ucp_ep_reconfig::entity::verify_configuration( /* Verify local and remote lanes are identical */ EXPECT_TRUE(is_lane_connected(ep(), lane, other)); - /* Verify correct number of reused lanes is configured */ + /* Verify correct number of reused lanes */ auto uct_ep = ucp_ep_get_lane(ep(), lane); auto it = std::find(m_uct_eps.begin(), m_uct_eps.end(), uct_ep); reused += (it != m_uct_eps.end()); } - EXPECT_EQ(reused, is_reconfigured() ? 0 : num_lanes); + if (is_reconfigured()) { + EXPECT_GE(reused, num_reused); + EXPECT_LE(reused, num_reused * num_paths()); + } else { + EXPECT_EQ(reused, num_lanes); + } } test_ucp_ep_reconfig::address_p @@ -297,8 +357,9 @@ void test_ucp_ep_reconfig::run(bool bidirectional) auto r_receiver = static_cast(&receiver()); EXPECT_NE(r_sender->is_reconfigured(), r_receiver->is_reconfigured()); - r_sender->verify_configuration(*r_receiver); - r_receiver->verify_configuration(*r_sender); + + r_sender->verify_configuration(*r_receiver, r_sender->num_reused_lanes()); + r_receiver->verify_configuration(*r_sender, r_sender->num_reused_lanes()); } void test_ucp_ep_reconfig::skip_non_p2p() @@ -308,9 +369,44 @@ void test_ucp_ep_reconfig::skip_non_p2p() } } +bool test_ucp_ep_reconfig::has_bond_iface() +{ + auto context = sender().ucph(); + const ucp_tl_resource_desc_t *rsc; + + ucs_carray_for_each(rsc, context->tl_rscs, context->num_tls) { + std::string tl_name(rsc->tl_rsc.dev_name); + + if (tl_name.find("bond") != std::string::npos) { + return true; + } + } + + return false; +} + +void test_ucp_ep_reconfig::ensure_reused_lanes_reconfigurable() +{ + if (!reuse_lanes()) { + return; + } + + if (has_transport("ud_v") || has_transport("ud_x")) { + UCS_TEST_SKIP_R("the test requires at least 2 lanes, while UD has only " + "1"); + } + + if (has_transport("tcp") || has_transport("dc_x") || has_transport("shm")) { + UCS_TEST_SKIP_R("non wired-up lanes are not supported yet"); + } + + if (has_bond_iface()) { + modify_config("IB_NUM_PATHS", "1", SETENV_IF_NOT_EXIST); + } +} + /* TODO: Remove skip condition after next PRs are merged. */ -UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, basic, - !has_transport("rc_x") || !has_transport("rc_v")) +UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, basic, !has_transport("rc")) { run(); } @@ -327,12 +423,13 @@ UCS_TEST_P(test_ucp_ep_reconfig, request_reset, "PROTO_REQUEST_RESET=y") run(); } -/* SHM causes lane reuse, disable for now. */ -UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, resolve_remote_id, - has_transport("shm") || is_self(), "MAX_RNDV_LANES=0") +UCS_TEST_SKIP_COND_P(test_ucp_ep_reconfig, resolve_remote_id, is_self(), + "MAX_RNDV_LANES=0") { if (has_transport("tcp")) { - UCS_TEST_SKIP_R("lanes are reused (not supported yet)"); + UCS_TEST_SKIP_R("asymmetric setup is not supported for this transport " + "due to a reachability issue - only matching " + "interfaces can connect"); } run(true); @@ -369,7 +466,7 @@ UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, request_reset, run(); } -/* SHM causes lane reuse, disable for now. */ +/* SHM + single lane won't trigger reconfig. */ UCS_TEST_SKIP_COND_P(test_reconfig_asymmetric, resolve_remote_id, has_transport("shm") || is_self(), "MAX_RNDV_LANES=0") {