From 2fc5de18cb6c563c9d5d0bf2f1e7558b9490bbd0 Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Sun, 25 Jul 2021 13:45:08 +0300 Subject: [PATCH 1/7] add try_new_session --- .../connector/connection_pool.hpp | 29 +++++++++++++++++++ .../connector/impl/connection_pool.ipp | 23 ++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/include/stream-client/connector/connection_pool.hpp b/include/stream-client/connector/connection_pool.hpp index dfa40d1..cc951bc 100644 --- a/include/stream-client/connector/connection_pool.hpp +++ b/include/stream-client/connector/connection_pool.hpp @@ -180,6 +180,35 @@ class base_connection_pool return get_session(get_connect_timeout()); } + /** + * Try to pull a session (stream) from the pool. + * Tries to get stream from the pool until specified deadline is reached. + * + * @note Unlike the get_session method, this method doesn't wait to fill up the pool in a timeout. + * + * @param[in] deadline Expiration time-point. + * @param[out] ec Set to indicate what error occurred, if any. + * + * @returns A stream wrapped in std::unique_ptr or nullptr. + */ + std::unique_ptr try_get_session(boost::system::error_code& ec, const time_point_type& deadline); + + /** + * Try to pull a session (stream) from the pool. + * Tries to get stream from the pool until specified timeout elapsed. + * + * @note Unlike the get_session method, this method doesn't wait to fill up the pool in a timeout. + * + * @param[in] timeout Expiration duration. + * @param[out] ec Set to indicate what error occurred, if any. + * + * @returns A stream wrapped in std::unique_ptr or nullptr. + */ + inline std::unique_ptr try_get_session(boost::system::error_code& ec, const time_duration_type& timeout) + { + return try_get_session(ec, clock_type::now() + timeout); + } + /** * Return the session pulled earlier from the pool. * diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index 1473fb2..5b4eb6f 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -43,9 +43,30 @@ base_connection_pool::get_session(boost::system::error_code& ec, cons } if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) { // session pool is still empty + ec = boost::asio::error::no_descriptors; + return nullptr; + } + + std::unique_ptr session = std::move(sesson_pool_.front().second); + sesson_pool_.pop_front(); + return session; +} + +template +std::unique_ptr::stream_type> +base_connection_pool::try_get_session(boost::system::error_code& ec, const time_point_type& deadline) +{ + std::unique_lock pool_lk(pool_mutex_, std::defer_lock); + if (!pool_lk.try_lock_until(deadline)) { + // failed to lock pool_mutex_ ec = boost::asio::error::timed_out; return nullptr; } + if (sesson_pool_.empty()) { + // session pool is empty + ec = boost::asio::error::no_descriptors; + return nullptr; + } std::unique_ptr session = std::move(sesson_pool_.front().second); sesson_pool_.pop_front(); @@ -140,7 +161,7 @@ void base_connection_pool::watch_pool_routine() pool_cv_.notify_all(); } else { // stop cpu spooling if nothing has been added - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } } From 03ec4063fedd4ca7937578eddf0d4e376bb65867 Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Mon, 26 Jul 2021 04:02:58 +0300 Subject: [PATCH 2/7] move mutex in watch_pool_routine to threads --- .../connector/impl/connection_pool.ipp | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index 5b4eb6f..0677a48 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -113,10 +113,12 @@ void base_connection_pool::watch_pool_routine() static const auto lock_timeout = std::chrono::milliseconds(100); while (watch_pool) { + // try to lock pool mutex std::unique_lock pool_lk(pool_mutex_, std::defer_lock); if (!pool_lk.try_lock_for(lock_timeout)) { continue; } + // remove session which idling past idle_timeout_ std::size_t pool_current_size = 0; for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) { @@ -128,21 +130,28 @@ void base_connection_pool::watch_pool_routine() ++pool_current_size; } } + // pool_current_size may be bigger if someone returned previous session std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; - // at this point we own pool_mutex_, but we want to get new sessions simultaneously; - // that's why new mutex to sync adding threads - std::mutex pool_add; - bool added = false; - auto add_session = [&pool_add, &added, &connector = this->connector_, &pool = this->sesson_pool_]() { + // release poll mutex after removing old sessions + pool_lk.unlock(); + + // creating new sessions may be slow and we want add them simultaneously; + // that's why we need to sync adding threads and lock pool + auto add_session = [&connector = this->connector_, &pool = this->sesson_pool_, &pool_mutex = this->pool_mutex_, + &pool_cv = this->pool_cv_]() { try { // getting new session is time consuming operation auto new_session = connector.new_session(); + // ensure only single session added at time - std::unique_lock add_lk(pool_add); + std::unique_lock pool_lk(pool_mutex); pool.emplace_back(clock_type::now(), std::move(new_session)); - added = true; + pool_lk.unlock(); + + // unblock one waiting thread + pool_cv.notify_one(); } catch (const boost::system::system_error& e) { // TODO: log errors ? } @@ -156,13 +165,7 @@ void base_connection_pool::watch_pool_routine() a.join(); } - pool_lk.unlock(); - if (added) { - pool_cv_.notify_all(); - } else { - // stop cpu spooling if nothing has been added - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } From 03317c7514e16e2da37478b40ab4da85da7436e6 Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Tue, 27 Jul 2021 03:12:54 +0300 Subject: [PATCH 3/7] fix --- include/stream-client/connector/impl/connection_pool.ipp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index 0677a48..ed40a27 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -140,14 +140,16 @@ void base_connection_pool::watch_pool_routine() // creating new sessions may be slow and we want add them simultaneously; // that's why we need to sync adding threads and lock pool auto add_session = [&connector = this->connector_, &pool = this->sesson_pool_, &pool_mutex = this->pool_mutex_, - &pool_cv = this->pool_cv_]() { + &pool_cv = this->pool_cv_, pool_max_size = this->pool_max_size_]() { try { // getting new session is time consuming operation auto new_session = connector.new_session(); // ensure only single session added at time std::unique_lock pool_lk(pool_mutex); - pool.emplace_back(clock_type::now(), std::move(new_session)); + if (pool.size() < pool_max_size) { + pool.emplace_back(clock_type::now(), std::move(new_session)); + } pool_lk.unlock(); // unblock one waiting thread From bbcc245750518da1f26c2659370eb556b95cb998 Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Fri, 30 Jul 2021 02:40:28 +0300 Subject: [PATCH 4/7] fix review --- .../connector/impl/connection_pool.ipp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index ed40a27..f5495a9 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -134,26 +134,23 @@ void base_connection_pool::watch_pool_routine() // pool_current_size may be bigger if someone returned previous session std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; - // release poll mutex after removing old sessions + // release pool mutex after removing old sessions pool_lk.unlock(); // creating new sessions may be slow and we want add them simultaneously; // that's why we need to sync adding threads and lock pool - auto add_session = [&connector = this->connector_, &pool = this->sesson_pool_, &pool_mutex = this->pool_mutex_, - &pool_cv = this->pool_cv_, pool_max_size = this->pool_max_size_]() { + auto add_session = [this]() { try { // getting new session is time consuming operation - auto new_session = connector.new_session(); + auto new_session = this->connector_.new_session(); // ensure only single session added at time - std::unique_lock pool_lk(pool_mutex); - if (pool.size() < pool_max_size) { - pool.emplace_back(clock_type::now(), std::move(new_session)); - } + std::unique_lock pool_lk(this->pool_mutex_); + this->sesson_pool_.emplace_back(clock_type::now(), std::move(new_session)); pool_lk.unlock(); // unblock one waiting thread - pool_cv.notify_one(); + this->pool_cv_.notify_one(); } catch (const boost::system::system_error& e) { // TODO: log errors ? } From 8efb1ef78f93f6cf9dfd394a880856fe13bac0be Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Fri, 30 Jul 2021 11:50:12 +0300 Subject: [PATCH 5/7] fix this --- include/stream-client/connector/impl/connection_pool.ipp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index f5495a9..9dcb4e4 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -142,15 +142,15 @@ void base_connection_pool::watch_pool_routine() auto add_session = [this]() { try { // getting new session is time consuming operation - auto new_session = this->connector_.new_session(); + auto new_session = connector_.new_session(); // ensure only single session added at time - std::unique_lock pool_lk(this->pool_mutex_); - this->sesson_pool_.emplace_back(clock_type::now(), std::move(new_session)); + std::unique_lock pool_lk(pool_mutex_); + sesson_pool_.emplace_back(clock_type::now(), std::move(new_session)); pool_lk.unlock(); // unblock one waiting thread - this->pool_cv_.notify_one(); + pool_cv_.notify_one(); } catch (const boost::system::system_error& e) { // TODO: log errors ? } From 30c95f4ff7dcf200d998af0d753c6e7fd53bbe4d Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Fri, 30 Jul 2021 16:29:32 +0300 Subject: [PATCH 6/7] fix review --- .../connector/impl/connection_pool.ipp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/include/stream-client/connector/impl/connection_pool.ipp b/include/stream-client/connector/impl/connection_pool.ipp index 9dcb4e4..2ac4ef2 100644 --- a/include/stream-client/connector/impl/connection_pool.ipp +++ b/include/stream-client/connector/impl/connection_pool.ipp @@ -131,13 +131,13 @@ void base_connection_pool::watch_pool_routine() } } - // pool_current_size may be bigger if someone returned previous session - std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; - // release pool mutex after removing old sessions pool_lk.unlock(); - // creating new sessions may be slow and we want add them simultaneously; + // pool_current_size may be bigger if someone returned previous session + std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0; + + // creating new sessions may be slow and we want to add them simultaneously; // that's why we need to sync adding threads and lock pool auto add_session = [this]() { try { @@ -164,7 +164,10 @@ void base_connection_pool::watch_pool_routine() a.join(); } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // stop cpu spooling if nothing has been added + if (vacant_places == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } } } From 2c9c3341ea23bd382119472c85b08c0dea6ae45a Mon Sep 17 00:00:00 2001 From: Nikita Belov Date: Fri, 30 Jul 2021 16:43:51 +0300 Subject: [PATCH 7/7] Bump version up to 1.1.9 --- CHANGELOG.md | 7 +++++++ CMakeLists.txt | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef0547..adc2efa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 1.1.9 (2021-07-30) + +### Features + +* connector::base_connection_pool: add try_get_session method +* connector::base_connection_pool: change global lock in watch_pool_routine with several small locks + ## 1.1.8 (2021-07-14) ### Features diff --git a/CMakeLists.txt b/CMakeLists.txt index 0263a6d..34f69d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0 FATAL_ERROR) set(STREAMCLIENT_VERSION_MAJOR "1") set(STREAMCLIENT_VERSION_MINOR "1") -set(STREAMCLIENT_VERSION_RELEASE "8") +set(STREAMCLIENT_VERSION_RELEASE "9") set(STREAMCLIENT_VERSION_STRING "${STREAMCLIENT_VERSION_MAJOR}.${STREAMCLIENT_VERSION_MINOR}.${STREAMCLIENT_VERSION_RELEASE}") set(STREAMCLIENT_LIB_VERSION ${STREAMCLIENT_VERSION_STRING}) mark_as_advanced(STREAMCLIENT_VERSION_MAJOR STREAMCLIENT_VERSION_MINOR STREAMCLIENT_VERSION_RELEASE STREAMCLIENT_VERSION_STRING STREAMCLIENT_LIB_VERSION)