Skip to content

Commit

Permalink
[coro_io][feat] reconnect wait time now exclude connect cost time. (#387
Browse files Browse the repository at this point in the history
)
  • Loading branch information
poor-circle authored Jul 27, 2023
1 parent 503555a commit 158c5f9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
8 changes: 7 additions & 1 deletion include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ class client_pool : public std::enable_shared_from_this<

async_simple::coro::Lazy<std::unique_ptr<client_t>> reconnect(
std::unique_ptr<client_t> client) {
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) {
co_await coro_io::sleep_for(pool_config_.reconnect_wait_time);
auto post_time_point = std::chrono::steady_clock::now();
auto wait_time =
pool_config_.reconnect_wait_time - (post_time_point - pre_time_point);
if (wait_time > std::chrono::milliseconds{10})
co_await coro_io::sleep_for(wait_time);
pre_time_point = post_time_point;
ok = (client_t::is_ok(co_await client->reconnect(host_name_)));
}
co_return ok ? std::move(client) : nullptr;
Expand Down
56 changes: 46 additions & 10 deletions src/coro_io/tests/test_client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <asio/io_context.hpp>
#include <atomic>
#include <cassert>
#include <chrono>
#include <exception>
#include <filesystem>
#include <fstream>
Expand Down Expand Up @@ -32,19 +33,18 @@
#include "ylt/coro_rpc/impl/expected.hpp"
using namespace std::chrono_literals;
using namespace async_simple::coro;
auto event =
[](
int lim, coro_io::client_pool<coro_rpc::coro_rpc_client> &pool,
ConditionVariable<SpinLock> &cv, SpinLock &lock,
std::function<void(coro_rpc::coro_rpc_client &client)> user_op =
[](auto &client) {
}) -> async_simple::coro::Lazy<bool> {
template <typename T = coro_rpc::coro_rpc_client>
async_simple::coro::Lazy<bool> event(
int lim, coro_io::client_pool<T> &pool, ConditionVariable<SpinLock> &cv,
SpinLock &lock,
std::function<void(coro_rpc::coro_rpc_client &client)> user_op =
[](auto &client) {
}) {
std::vector<RescheduleLazy<bool>> works;
int64_t cnt = 0;
for (int i = 0; i < lim; ++i) {
auto op = [&cnt, &lock, &cv, &lim,
&user_op](coro_rpc::coro_rpc_client &client)
-> async_simple::coro::Lazy<void> {
&user_op](T &client) -> async_simple::coro::Lazy<void> {
user_op(client);
auto l = co_await lock.coScopedLock();
if (++cnt < lim) {
Expand All @@ -59,7 +59,8 @@ auto event =
co_return;
};
auto backer = [&cv, &lock, &cnt, &lim](
auto &pool, auto op) -> async_simple::coro::Lazy<bool> {
coro_io::client_pool<T> &pool,
auto op) -> async_simple::coro::Lazy<bool> {
async_simple::Promise<bool> p;
auto res = co_await pool.send_request(op);
if (!res.has_value()) {
Expand Down Expand Up @@ -150,6 +151,41 @@ TEST_CASE("test reconnect") {
}());
}

struct mock_client : public coro_rpc::coro_rpc_client {
using coro_rpc::coro_rpc_client::coro_rpc_client;
async_simple::coro::Lazy<std::errc> reconnect(const std::string &hostname) {
auto ec = co_await this->coro_rpc::coro_rpc_client::reconnect(hostname);
if (ec != std::errc{}) {
co_await coro_io::sleep_for(300ms);
}
co_return ec;
}
};
TEST_CASE("test reconnect retry wait time exinclude reconnect cost time") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
auto tp = std::chrono::steady_clock::now();
auto pool = coro_io::client_pool<mock_client>::create(
"127.0.0.1:8801",
{.connect_retry_count = 3, .reconnect_wait_time = 500ms});
SpinLock lock;
ConditionVariable<SpinLock> cv;
coro_rpc::coro_rpc_server server(2, 8801);
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(350ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
});
auto res = co_await event<mock_client>(100, *pool, cv, lock);
CHECK(res);
CHECK(pool->free_client_count() == 100);
auto dur = std::chrono::steady_clock::now() - tp;
std::cout << dur.count() << std::endl;
CHECK((dur >= 500ms && dur <= 700ms));
server.stop();
co_return;
}());
}

TEST_CASE("test collect_free_client") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
Expand Down

0 comments on commit 158c5f9

Please sign in to comment.