Skip to content

Commit

Permalink
Merge branch 'release/v1.1.9'
Browse files Browse the repository at this point in the history
  • Loading branch information
i-vovk committed Jul 31, 2021
2 parents a201e50 + 1f3412b commit cbc64c8
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 16 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.1.9 (2021-07-30)

### Features

* connector::base_connection_pool: add try_get_session method
* connector::base_connection_pool: change global lock in watch_pool_routine with several small locks

## 1.1.8 (2021-07-14)

### Features
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.0 FATAL_ERROR)

set(STREAMCLIENT_VERSION_MAJOR "1")
set(STREAMCLIENT_VERSION_MINOR "1")
set(STREAMCLIENT_VERSION_RELEASE "8")
set(STREAMCLIENT_VERSION_RELEASE "9")
set(STREAMCLIENT_VERSION_STRING "${STREAMCLIENT_VERSION_MAJOR}.${STREAMCLIENT_VERSION_MINOR}.${STREAMCLIENT_VERSION_RELEASE}")
set(STREAMCLIENT_LIB_VERSION ${STREAMCLIENT_VERSION_STRING})
mark_as_advanced(STREAMCLIENT_VERSION_MAJOR STREAMCLIENT_VERSION_MINOR STREAMCLIENT_VERSION_RELEASE STREAMCLIENT_VERSION_STRING STREAMCLIENT_LIB_VERSION)
Expand Down
29 changes: 29 additions & 0 deletions include/stream-client/connector/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,35 @@ class base_connection_pool
return get_session(get_connect_timeout());
}

/**
* Try to pull a session (stream) from the pool.
* Tries to get stream from the pool until specified deadline is reached.
*
* @note Unlike the get_session method, this method doesn't wait to fill up the pool in a timeout.
*
* @param[in] deadline Expiration time-point.
* @param[out] ec Set to indicate what error occurred, if any.
*
* @returns A stream wrapped in std::unique_ptr or nullptr.
*/
std::unique_ptr<stream_type> try_get_session(boost::system::error_code& ec, const time_point_type& deadline);

/**
* Try to pull a session (stream) from the pool.
* Tries to get stream from the pool until specified timeout elapsed.
*
* @note Unlike the get_session method, this method doesn't wait to fill up the pool in a timeout.
*
* @param[in] timeout Expiration duration.
* @param[out] ec Set to indicate what error occurred, if any.
*
* @returns A stream wrapped in std::unique_ptr or nullptr.
*/
inline std::unique_ptr<stream_type> try_get_session(boost::system::error_code& ec, const time_duration_type& timeout)
{
return try_get_session(ec, clock_type::now() + timeout);
}

/**
* Return the session pulled earlier from the pool.
*
Expand Down
56 changes: 41 additions & 15 deletions include/stream-client/connector/impl/connection_pool.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,30 @@ base_connection_pool<Connector>::get_session(boost::system::error_code& ec, cons
}
if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) {
// session pool is still empty
ec = boost::asio::error::no_descriptors;
return nullptr;
}

std::unique_ptr<stream_type> session = std::move(sesson_pool_.front().second);
sesson_pool_.pop_front();
return session;
}

template <typename Connector>
std::unique_ptr<typename base_connection_pool<Connector>::stream_type>
base_connection_pool<Connector>::try_get_session(boost::system::error_code& ec, const time_point_type& deadline)
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
// failed to lock pool_mutex_
ec = boost::asio::error::timed_out;
return nullptr;
}
if (sesson_pool_.empty()) {
// session pool is empty
ec = boost::asio::error::no_descriptors;
return nullptr;
}

std::unique_ptr<stream_type> session = std::move(sesson_pool_.front().second);
sesson_pool_.pop_front();
Expand Down Expand Up @@ -92,10 +113,12 @@ void base_connection_pool<Connector>::watch_pool_routine()
static const auto lock_timeout = std::chrono::milliseconds(100);

while (watch_pool) {
// try to lock pool mutex
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_for(lock_timeout)) {
continue;
}

// remove session which idling past idle_timeout_
std::size_t pool_current_size = 0;
for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) {
Expand All @@ -107,21 +130,27 @@ void base_connection_pool<Connector>::watch_pool_routine()
++pool_current_size;
}
}

// release pool mutex after removing old sessions
pool_lk.unlock();

// pool_current_size may be bigger if someone returned previous session
std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0;

// at this point we own pool_mutex_, but we want to get new sessions simultaneously;
// that's why new mutex to sync adding threads
std::mutex pool_add;
bool added = false;
auto add_session = [&pool_add, &added, &connector = this->connector_, &pool = this->sesson_pool_]() {
// creating new sessions may be slow and we want to add them simultaneously;
// that's why we need to sync adding threads and lock pool
auto add_session = [this]() {
try {
// getting new session is time consuming operation
auto new_session = connector.new_session();
auto new_session = connector_.new_session();

// ensure only single session added at time
std::unique_lock<std::mutex> add_lk(pool_add);
pool.emplace_back(clock_type::now(), std::move(new_session));
added = true;
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_);
sesson_pool_.emplace_back(clock_type::now(), std::move(new_session));
pool_lk.unlock();

// unblock one waiting thread
pool_cv_.notify_one();
} catch (const boost::system::system_error& e) {
// TODO: log errors ?
}
Expand All @@ -135,12 +164,9 @@ void base_connection_pool<Connector>::watch_pool_routine()
a.join();
}

pool_lk.unlock();
if (added) {
pool_cv_.notify_all();
} else {
// stop cpu spooling if nothing has been added
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// stop cpu spooling if nothing has been added
if (vacant_places == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
}
Expand Down

0 comments on commit cbc64c8

Please sign in to comment.