Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[doc] add coro_rpc doc #681

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/ylt/coro_rpc/impl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ class context_base {
/*finish here*/
self_->status_ = context_status::finish_response;
}
const context_info_t<rpc_protocol> *get_context() const noexcept {
const context_info_t<rpc_protocol> *get_context_info() const noexcept {
return self_.get();
}
context_info_t<rpc_protocol> *get_context_info() noexcept {
return self_.get();
}
context_info_t<rpc_protocol> *get_context() noexcept { return self_.get(); }
};

template <typename T>
Expand Down
19 changes: 12 additions & 7 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class client_pool;

namespace coro_rpc {

inline uint64_t get_global_client_id() {
static std::atomic<uint64_t> cid = 0;
return cid.fetch_add(1, std::memory_order::relaxed);
}

#ifdef GENERATE_BENCHMARK_DATA
std::string benchmark_file_path = "./";
#endif
Expand Down Expand Up @@ -105,7 +110,7 @@ struct async_rpc_result_value_t {
async_rpc_result_value_t(T &&result) : result_(std::move(result)) {}
T &result() noexcept { return result_; }
const T &result() const noexcept { return result_; }
std::string_view attachment() const noexcept {
std::string_view get_attachment() const noexcept {
return buffer_.resp_attachment_buf_;
}
resp_body release_buffer() { return std::move(buffer_); }
Expand Down Expand Up @@ -155,12 +160,12 @@ class coro_rpc_client {
const inline static rpc_error connect_error = {errc::io_error,
"client has been closed"};
struct config {
uint32_t client_id = 0;
uint64_t client_id = get_global_client_id();
std::chrono::milliseconds timeout_duration =
std::chrono::milliseconds{5000};
std::string host;
std::string port;
bool enable_tcp_no_delay_ = true;
bool enable_tcp_no_delay = true;
#ifdef YLT_ENABLE_SSL
std::filesystem::path ssl_cert_path;
std::string ssl_domain;
Expand All @@ -172,7 +177,7 @@ class coro_rpc_client {
* @param io_context asio io_context, async event handler
*/
coro_rpc_client(asio::io_context::executor_type executor,
uint32_t client_id = 0)
uint64_t client_id = get_global_client_id())
: control_(std::make_shared<control_t>(executor, false)),
timer_(std::make_unique<coro_io::period_timer>(executor)) {
config_.client_id = client_id;
Expand All @@ -184,7 +189,7 @@ class coro_rpc_client {
*/
coro_rpc_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor(),
uint32_t client_id = 0)
uint64_t client_id = get_global_client_id())
: control_(
std::make_shared<control_t>(executor->get_asio_executor(), false)),
timer_(std::make_unique<coro_io::period_timer>(
Expand Down Expand Up @@ -424,7 +429,7 @@ class coro_rpc_client {
ELOGV(WARN, "client_id %d connect timeout", config_.client_id);
co_return errc::timed_out;
}
if (config_.enable_tcp_no_delay_ == true) {
if (config_.enable_tcp_no_delay == true) {
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
}

Expand Down Expand Up @@ -738,7 +743,7 @@ class coro_rpc_client {
call<func>(std::forward<Args>(args)...));
}
#endif

private:
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_error> send_request_for_impl(
auto duration, uint32_t &id, coro_io::period_timer &timer,
Expand Down
58 changes: 35 additions & 23 deletions include/ylt/coro_rpc/impl/coro_rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <memory>
#include <mutex>
#include <system_error>
#include <thread>
#include <unordered_map>
#include <vector>
#include <ylt/easylog.hpp>
Expand Down Expand Up @@ -68,29 +69,36 @@ class coro_rpc_server_base {
* TODO: add doc
* @param thread_num the number of io_context.
* @param port the server port to listen.
* @param listen address of server
* @param conn_timeout_duration client connection timeout. 0 for no timeout.
* default no timeout.
* @param is_enable_tcp_no_delay is tcp socket allow
*/
coro_rpc_server_base(size_t thread_num, unsigned short port,
coro_rpc_server_base(size_t thread_num = std::thread::hardware_concurrency(),
unsigned short port = 9001,
std::string address = "0.0.0.0",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
conn_timeout_duration = std::chrono::seconds(0),
bool is_enable_tcp_no_delay = true)
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(port),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(is_enable_tcp_no_delay) {
init_address(std::move(address));
}

coro_rpc_server_base(size_t thread_num,
std::string address /* = "0.0.0.0:9001" */,
coro_rpc_server_base(size_t thread_num = std::thread::hardware_concurrency(),
std::string address = "0.0.0.0:9001",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
conn_timeout_duration = std::chrono::seconds(0),
bool is_enable_tcp_no_delay = true)
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(is_enable_tcp_no_delay) {
init_address(std::move(address));
}

Expand All @@ -99,7 +107,8 @@ class coro_rpc_server_base {
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(config.port),
conn_timeout_duration_(config.conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(config.is_enable_tcp_no_delay) {
init_address(config.address);
}

Expand All @@ -122,19 +131,19 @@ class coro_rpc_server_base {
* @return error code if start failed, otherwise block until server stop.
*/
[[nodiscard]] coro_rpc::err_code start() noexcept {
auto ret = async_start();
if (ret) {
ret.value().wait();
return ret.value().value();
}
else {
return ret.error();
}
return async_start().get();
}

private:
async_simple::Future<coro_rpc::err_code> make_error_future(
coro_rpc::err_code &&err) {
async_simple::Promise<coro_rpc::err_code> p;
p.setValue(std::move(err));
return p.getFuture();
}

[[nodiscard]] coro_rpc::expected<async_simple::Future<coro_rpc::err_code>,
coro_rpc::err_code>
async_start() noexcept {
public:
async_simple::Future<coro_rpc::err_code> async_start() noexcept {
{
std::unique_lock lock(start_mtx_);
if (flag_ != stat::init) {
Expand All @@ -144,8 +153,8 @@ class coro_rpc_server_base {
else if (flag_ == stat::stop) {
ELOGV(INFO, "has stoped");
}
return coro_rpc::unexpected<coro_rpc::err_code>{
coro_rpc::errc::server_has_ran};
return make_error_future(
coro_rpc::err_code{coro_rpc::errc::server_has_ran});
}
errc_ = listen();
if (!errc_) {
Expand Down Expand Up @@ -177,7 +186,7 @@ class coro_rpc_server_base {
return std::move(future);
}
else {
return coro_rpc::unexpected<coro_rpc::err_code>{errc_};
return make_error_future(coro_rpc::err_code{errc_});
}
}

Expand Down Expand Up @@ -387,7 +396,9 @@ class coro_rpc_server_base {

int64_t conn_id = ++conn_id_;
ELOGV(INFO, "new client conn_id %d coming", conn_id);
socket.set_option(asio::ip::tcp::no_delay(true), error);
if (is_enable_tcp_no_delay_) {
socket.set_option(asio::ip::tcp::no_delay(true), error);
}
auto conn = std::make_shared<coro_connection>(executor, std::move(socket),
conn_timeout_duration_);
conn->set_quit_callback(
Expand Down Expand Up @@ -459,6 +470,7 @@ class coro_rpc_server_base {

std::atomic<uint16_t> port_;
std::string address_;
bool is_enable_tcp_no_delay_;
coro_rpc::err_code errc_ = {};
std::chrono::steady_clock::duration conn_timeout_duration_;

Expand Down
12 changes: 6 additions & 6 deletions include/ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@

namespace coro_rpc {

namespace config {
struct coro_rpc_config_base {
uint16_t port = 8801;
struct config_base {
bool is_enable_tcp_no_delay = true;
uint16_t port = 9001;
unsigned thread_num = std::thread::hardware_concurrency();
std::chrono::steady_clock::duration conn_timeout_duration =
std::chrono::seconds{0};
std::string address = "0.0.0.0";
};

struct coro_rpc_default_config : public coro_rpc_config_base {
struct config_t : public config_base {
using rpc_protocol = coro_rpc::protocol::coro_rpc_protocol;
using executor_pool_t = coro_io::io_context_pool;
};
} // namespace config

using coro_rpc_server = coro_rpc_server_base<config::coro_rpc_default_config>;
using coro_rpc_server = coro_rpc_server_base<config_t>;
} // namespace coro_rpc
12 changes: 6 additions & 6 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TEST_CASE("test RR") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
Expand Down Expand Up @@ -62,10 +62,10 @@ TEST_CASE("test WRR") {

coro_rpc::coro_rpc_server server1(1, 8801);
auto res = server1.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc::coro_rpc_server server2(1, 8802);
auto res2 = server2.async_start();
REQUIRE_MESSAGE(res2, "server start failed");
REQUIRE_MESSAGE(!res2.hasResult(), "server start failed");

async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
auto hosts =
Expand Down Expand Up @@ -119,7 +119,7 @@ TEST_CASE("test Random") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
Expand Down Expand Up @@ -148,7 +148,7 @@ TEST_CASE("test single host") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
Expand All @@ -168,7 +168,7 @@ TEST_CASE("test send_request config") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 9813);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:9813"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
Expand Down
10 changes: 5 additions & 5 deletions src/coro_io/tests/test_client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_CASE("test client pool") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(is_started.hasResult() == false);
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100,
.idle_timeout = 300ms,
Expand All @@ -114,7 +114,7 @@ TEST_CASE("test idle timeout yield") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(!is_started.hasResult());
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100,
.idle_queue_per_max_clear_count = 1,
Expand Down Expand Up @@ -142,7 +142,7 @@ TEST_CASE("test reconnect") {
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(700ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
REQUIRE(!server_is_started.hasResult());
});

auto res = co_await event(100, *pool, cv, lock);
Expand Down Expand Up @@ -177,7 +177,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") {
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(350ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
REQUIRE(!server_is_started.hasResult());
});
auto res = co_await event<mock_client>(100, *pool, cv, lock);
CHECK(res);
Expand All @@ -196,7 +196,7 @@ TEST_CASE("test collect_free_client") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(!is_started.hasResult());
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms});

Expand Down
4 changes: 2 additions & 2 deletions src/coro_rpc/benchmark/data_gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main() {
coro_rpc::coro_rpc_server server(std::thread::hardware_concurrency(), 0);
register_handlers(server);
auto started = server.async_start();
if (!started) {
if (started.hasResult()) {
ELOGV(ERROR, "server started failed");
return -1;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ int main() {

server.stop();

started->wait();
started.wait();

pool.stop();
thd.join();
Expand Down
2 changes: 1 addition & 1 deletion src/coro_rpc/examples/base_examples/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void async_echo_by_callback(
/* rpc function runs in global io thread pool */
coro_io::post([conn, data]() mutable {
/* send work to global non-io thread pool */
auto *ctx = conn.get_context();
auto *ctx = conn.get_context_info();
conn.response_msg(data); /*response here*/
}).start([](auto &&) {
});
Expand Down
4 changes: 2 additions & 2 deletions src/coro_rpc/examples/file_transfer/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
std::string echo(std::string str) { return str; }

void upload_file(coro_rpc::context<std::errc> conn, file_part part) {
auto &ctx = *conn.get_context();
auto &ctx = *conn.get_context_info();
if (!ctx.tag().has_value()) {
auto filename = std::to_string(std::time(0)) +
std::filesystem::path(part.filename).extension().string();
Expand All @@ -27,7 +27,7 @@ void upload_file(coro_rpc::context<std::errc> conn, file_part part) {

void download_file(coro_rpc::context<response_part> conn,
std::string filename) {
auto &ctx = *conn.get_context();
auto &ctx = *conn.get_context_info();
if (!ctx.tag().has_value()) {
std::string actual_filename =
std::filesystem::path(filename).filename().string();
Expand Down
Loading
Loading