diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef0547..adc2efa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 1.1.9 (2021-07-30) + +### Features + +* connector::base_connection_pool: add try_get_session method +* connector::base_connection_pool: change global lock in watch_pool_routine with several small locks + ## 1.1.8 (2021-07-14) ### Features diff --git a/CMakeLists.txt b/CMakeLists.txt index 0263a6d..34f69d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0 FATAL_ERROR) set(STREAMCLIENT_VERSION_MAJOR "1") set(STREAMCLIENT_VERSION_MINOR "1") -set(STREAMCLIENT_VERSION_RELEASE "8") +set(STREAMCLIENT_VERSION_RELEASE "9") set(STREAMCLIENT_VERSION_STRING "${STREAMCLIENT_VERSION_MAJOR}.${STREAMCLIENT_VERSION_MINOR}.${STREAMCLIENT_VERSION_RELEASE}") set(STREAMCLIENT_LIB_VERSION ${STREAMCLIENT_VERSION_STRING}) mark_as_advanced(STREAMCLIENT_VERSION_MAJOR STREAMCLIENT_VERSION_MINOR STREAMCLIENT_VERSION_RELEASE STREAMCLIENT_VERSION_STRING STREAMCLIENT_LIB_VERSION) diff --git a/include/stream-client/connector/connection_pool.hpp b/include/stream-client/connector/connection_pool.hpp index dfa40d1..cc951bc 100644 --- a/include/stream-client/connector/connection_pool.hpp +++ b/include/stream-client/connector/connection_pool.hpp @@ -180,6 +180,35 @@ class base_connection_pool return get_session(get_connect_timeout()); } + /** + * Try to pull a session (stream) from the pool. + * Tries to get stream from the pool until specified deadline is reached. + * + * @note Unlike the get_session method, this method doesn't wait to fill up the pool in a timeout. + * + * @param[in] deadline Expiration time-point. + * @param[out] ec Set to indicate what error occurred, if any. + * + * @returns A stream wrapped in std::unique_ptr or nullptr. + */ + std::unique_ptr try_get_session(boost::system::error_code& ec, const time_point_type& deadline); + + /** + * Try to pull a session (stream) from the pool. + * Tries to get stream from the pool until specified timeout elapsed. + * + * @note Unlike the get_session method, this method doesn't wait to fill up the pool in a timeout. + * + * @param[in] timeout Expiration duration. + * @param[out] ec Set to indicate what error occurred, if any. + * + * @returns A stream wrapped in std::unique_ptr or nullptr. + */ + inline std::unique_ptr try_get_session(boost::system::error_code& ec, const time_duration_type& timeout) + { + return try_get_session(ec, clock_type::now() + timeout); + } + /** * Return the session pulled earlier from the pool. * diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index 1473fb2..2ac4ef2 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -43,9 +43,30 @@ base_connection_pool::get_session(boost::system::error_code& ec, cons } if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) { // session pool is still empty + ec = boost::asio::error::no_descriptors; + return nullptr; + } + + std::unique_ptr session = std::move(sesson_pool_.front().second); + sesson_pool_.pop_front(); + return session; +} + +template +std::unique_ptr::stream_type> +base_connection_pool::try_get_session(boost::system::error_code& ec, const time_point_type& deadline) +{ + std::unique_lock pool_lk(pool_mutex_, std::defer_lock); + if (!pool_lk.try_lock_until(deadline)) { + // failed to lock pool_mutex_ ec = boost::asio::error::timed_out; return nullptr; } + if (sesson_pool_.empty()) { + // session pool is empty + ec = boost::asio::error::no_descriptors; + return nullptr; + } std::unique_ptr session = std::move(sesson_pool_.front().second); sesson_pool_.pop_front(); @@ -92,10 +113,12 @@ void base_connection_pool::watch_pool_routine() static const auto lock_timeout = std::chrono::milliseconds(100); while (watch_pool) { + // try to lock pool mutex std::unique_lock pool_lk(pool_mutex_, std::defer_lock); if (!pool_lk.try_lock_for(lock_timeout)) { continue; } + // remove session which idling past idle_timeout_ std::size_t pool_current_size = 0; for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) { @@ -107,21 +130,27 @@ void base_connection_pool::watch_pool_routine() ++pool_current_size; } } + + // release pool mutex after removing old sessions + pool_lk.unlock(); + // pool_current_size may be bigger if someone returned previous session std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; - // at this point we own pool_mutex_, but we want to get new sessions simultaneously; - // that's why new mutex to sync adding threads - std::mutex pool_add; - bool added = false; - auto add_session = [&pool_add, &added, &connector = this->connector_, &pool = this->sesson_pool_]() { + // creating new sessions may be slow and we want to add them simultaneously; + // that's why we need to sync adding threads and lock pool + auto add_session = [this]() { try { // getting new session is time consuming operation - auto new_session = connector.new_session(); + auto new_session = connector_.new_session(); + // ensure only single session added at time - std::unique_lock add_lk(pool_add); - pool.emplace_back(clock_type::now(), std::move(new_session)); - added = true; + std::unique_lock pool_lk(pool_mutex_); + sesson_pool_.emplace_back(clock_type::now(), std::move(new_session)); + pool_lk.unlock(); + + // unblock one waiting thread + pool_cv_.notify_one(); } catch (const boost::system::system_error& e) { // TODO: log errors ? } @@ -135,12 +164,9 @@ void base_connection_pool::watch_pool_routine() a.join(); } - pool_lk.unlock(); - if (added) { - pool_cv_.notify_all(); - } else { - // stop cpu spooling if nothing has been added - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // stop cpu spooling if nothing has been added + if (vacant_places == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } }