Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restyle feat: WIP: Add back the tox_loop implementation for low latency. #1824

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ auto_test(send_message)
auto_test(set_name)
auto_test(set_status_message)
auto_test(skeleton)
auto_test(tox_loop)
auto_test(tox_many)
auto_test(tox_many_tcp)
auto_test(tox_one)
Expand Down
114 changes: 114 additions & 0 deletions auto_tests/tox_loop_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include <pthread.h>
#include <stdlib.h>
#include <time.h>

#include "../toxcore/tox.h"

#include "check_compat.h"
#include "../testing/misc_tools.h"

#define TCP_RELAY_PORT 33448
/* The Travis-CI container responds poorly to ::1 as a localhost address
* You're encouraged to -D FORCE_TESTS_IPV6 on a local test */
#ifdef FORCE_TESTS_IPV6
#define TOX_LOCALHOST "::1"
#else
#define TOX_LOCALHOST "127.0.0.1"
#endif

typedef struct Loop_Test {
volatile int start_count;
volatile int stop_count;
pthread_mutex_t mutex;
Tox *tox;
} Loop_Test;

static void tox_loop_cb_start(Tox *tox, void *data)
{
Loop_Test *userdata = (Loop_Test *)data;
pthread_mutex_lock(&userdata->mutex);
fprintf(stderr, "br1: %p (%d)\n", (volatile void *)&userdata->start_count, userdata->start_count);
fputs("br2\n", stderr);
++userdata->start_count;
}

static void tox_loop_cb_stop(Tox *tox, void *data)
{
Loop_Test *userdata = (Loop_Test *)data;
++userdata->stop_count;
pthread_mutex_unlock(&userdata->mutex);
}

static void *tox_loop_worker(void *data)
{
Loop_Test *userdata = (Loop_Test *)data;
Tox_Err_Loop err;
tox_loop(userdata->tox, userdata, &err);
ck_assert_msg(err == TOX_ERR_LOOP_OK, "tox_loop error: %d", err);
return nullptr;
}

static void test_tox_loop(void)
{
pthread_t worker, worker_tcp;
struct Tox_Options *opts = tox_options_new(nullptr);
Loop_Test userdata;
uint8_t dpk[TOX_PUBLIC_KEY_SIZE];
int retval;

userdata.start_count = 0;
userdata.stop_count = 0;
pthread_mutex_init(&userdata.mutex, nullptr);

tox_options_set_tcp_port(opts, TCP_RELAY_PORT);
userdata.tox = tox_new(opts, nullptr);
tox_callback_loop_begin(userdata.tox, tox_loop_cb_start);
tox_callback_loop_end(userdata.tox, tox_loop_cb_stop);
pthread_create(&worker, nullptr, tox_loop_worker, &userdata);

fprintf(stderr, "br0: udp %p\n", (volatile void *)&userdata.start_count);
tox_self_get_dht_id(userdata.tox, dpk);

tox_options_default(opts);
Loop_Test userdata_tcp;
userdata_tcp.start_count = 0;
userdata_tcp.stop_count = 0;
pthread_mutex_init(&userdata_tcp.mutex, nullptr);
userdata_tcp.tox = tox_new(opts, nullptr);
tox_callback_loop_begin(userdata_tcp.tox, tox_loop_cb_start);
tox_callback_loop_end(userdata_tcp.tox, tox_loop_cb_stop);
pthread_create(&worker_tcp, nullptr, tox_loop_worker, &userdata_tcp);

pthread_mutex_lock(&userdata_tcp.mutex);
TOX_ERR_BOOTSTRAP error;
ck_assert_msg(tox_add_tcp_relay(userdata_tcp.tox, TOX_LOCALHOST, TCP_RELAY_PORT, dpk, &error), "Add relay error, %i",
error);
ck_assert_msg(tox_bootstrap(userdata_tcp.tox, TOX_LOCALHOST, 33445, dpk, &error), "Bootstrap error, %i", error);
pthread_mutex_unlock(&userdata_tcp.mutex);

c_sleep(1000);

tox_loop_stop(userdata.tox);
pthread_join(worker, (void **)(void *)&retval);
ck_assert_msg(retval == 0, "tox_loop didn't return 0");

tox_kill(userdata.tox);
fprintf(stderr, "br3: udp %p (%d)\n", (volatile void *)&userdata.start_count, userdata.start_count);
ck_assert_msg(userdata.start_count == userdata.stop_count, "start and stop must match (start = %d, stop = %d)",
userdata.start_count, userdata.stop_count);

tox_loop_stop(userdata_tcp.tox);
pthread_join(worker_tcp, (void **)(void *)&retval);
ck_assert_msg(retval == 0, "tox_loop didn't return 0");

tox_kill(userdata_tcp.tox);
fprintf(stderr, "br4: tcp %p (%d)\n", (volatile void *)&userdata_tcp.start_count, userdata_tcp.start_count);
ck_assert_msg(userdata_tcp.start_count == userdata_tcp.stop_count, "start and stop must match (start = %d, stop = %d)",
userdata_tcp.start_count, userdata_tcp.stop_count);
}

int main(int argc, char *argv[])
{
test_tox_loop();
return 0;
}
24 changes: 22 additions & 2 deletions toxcore/Messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1888,15 +1888,31 @@ Messenger *new_messenger(Mono_Time *mono_time, Messenger_Options *options, unsig

Messenger *m = (Messenger *)calloc(1, sizeof(Messenger));

if (!m) {
if (m == nullptr) {
return nullptr;
}


#ifdef HAVE_LIBEV
m->dispatcher = ev_loop_new(0);
#else
m->loop_run = false;
#endif // HAVE_LIBEV

#if defined(HAVE_LIBEV)

if (m->dispatcher == nullptr) {
free(m);
return nullptr;
}

#endif // HAVE_LIBEV

m->mono_time = mono_time;

m->fr = friendreq_new();

if (!m->fr) {
if (m->fr == nullptr) {
free(m);
return nullptr;
}
Expand Down Expand Up @@ -2044,6 +2060,10 @@ void kill_messenger(Messenger *m)
clear_receipts(m, i);
}

#ifdef HAVE_LIBEV
ev_loop_destroy(m->dispatcher);
#endif

logger_kill(m->log);
free(m->friendlist);
friendreq_kill(m->fr);
Expand Down
11 changes: 11 additions & 0 deletions toxcore/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include "net_crypto.h"
#include "state.h"

#ifdef HAVE_LIBEV
#include <ev.h>
#endif

#define MAX_NAME_LENGTH 128
/* TODO(irungentoo): this must depend on other variable. */
#define MAX_STATUSMESSAGE_LENGTH 1007
Expand Down Expand Up @@ -291,6 +295,13 @@ struct Messenger {
m_self_connection_status_cb *core_connection_change;
unsigned int last_connection_status;

#ifdef HAVE_LIBEV
struct ev_loop *dispatcher;
ev_async stop_loop;
#else
bool loop_run;
#endif

Messenger_Options options;
};

Expand Down
50 changes: 50 additions & 0 deletions toxcore/TCP_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include <stdlib.h>
#include <string.h>

#ifdef HAVE_LIBEV
#include <ev.h>
#endif

#include "mono_time.h"
#include "util.h"

Expand All @@ -22,9 +26,19 @@ typedef struct TCP_Client_Conn {
uint32_t number;
} TCP_Client_Conn;

#ifdef HAVE_LIBEV
typedef struct TCP_Client_Socket_Listener {
ev_io listener;
struct ev_loop *dispatcher;
} TCP_Client_Socket_Listener;
#endif

struct TCP_Client_Connection {
TCP_Client_Status status;
Socket sock;
#ifdef HAVE_LIBEV
TCP_Client_Socket_Listener sock_listener;
#endif
uint8_t self_public_key[CRYPTO_PUBLIC_KEY_SIZE]; /* our public key */
uint8_t public_key[CRYPTO_PUBLIC_KEY_SIZE]; /* public key of the server */
IP_Port ip_port; /* The ip and port of the server */
Expand Down Expand Up @@ -79,10 +93,41 @@ IP_Port tcp_con_ip_port(const TCP_Client_Connection *con)
return con->ip_port;
}

Socket tcp_con_sock(const TCP_Client_Connection *con)
{
return con->sock;
}

TCP_Client_Status tcp_con_status(const TCP_Client_Connection *con)
{
return con->status;
}

#ifdef HAVE_LIBEV
bool tcp_con_ev_is_active(TCP_Client_Connection *con)
{
return ev_is_active(&con->sock_listener.listener)
|| ev_is_pending(&con->sock_listener.listener);
}

void tcp_con_ev_listen(TCP_Client_Connection *con, struct ev_loop *dispatcher, tcp_con_ev_listen_cb *callback,
void *data)
{
con->sock_listener.dispatcher = dispatcher;
con->sock_listener.listener.data = data;

ev_io_init(&con->sock_listener.listener, callback, con->sock.socket, EV_READ);
ev_io_start(dispatcher, &con->sock_listener.listener);
}
#endif

void tcp_con_ev_stop(TCP_Client_Connection *con)
{
#ifdef HAVE_LIBEV
ev_io_stop(con->sock_listener.dispatcher, &con->sock_listener.listener);
#endif
}

void *tcp_con_custom_object(const TCP_Client_Connection *con)
{
return con->custom_object;
Expand Down Expand Up @@ -1052,6 +1097,11 @@ void kill_TCP_connection(TCP_Client_Connection *tcp_connection)

wipe_priority_list(tcp_connection->priority_queue_start);
kill_sock(tcp_connection->sock);

#ifdef HAVE_LIBEV
ev_io_stop(tcp_connection->sock_listener.dispatcher, &tcp_connection->sock_listener.listener);
#endif

crypto_memzero(tcp_connection, sizeof(TCP_Client_Connection));
free(tcp_connection);
}
14 changes: 14 additions & 0 deletions toxcore/TCP_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include "TCP_server.h"
#include "crypto_core.h"

#ifdef HAVE_LIBEV
#include <ev.h>
#endif

#define TCP_CONNECTION_TIMEOUT 10

typedef enum TCP_Proxy_Type {
Expand Down Expand Up @@ -40,8 +44,18 @@ typedef struct TCP_Client_Connection TCP_Client_Connection;

const uint8_t *tcp_con_public_key(const TCP_Client_Connection *con);
IP_Port tcp_con_ip_port(const TCP_Client_Connection *con);
Socket tcp_con_sock(const TCP_Client_Connection *con);
TCP_Client_Status tcp_con_status(const TCP_Client_Connection *con);

#ifdef HAVE_LIBEV
bool tcp_con_ev_is_active(TCP_Client_Connection *con);

typedef void tcp_con_ev_listen_cb(struct ev_loop *dispatcher, ev_io *sock_listener, int events);
void tcp_con_ev_listen(TCP_Client_Connection *con, struct ev_loop *dispatcher, tcp_con_ev_listen_cb *callback,
void *data);
#endif
void tcp_con_ev_stop(TCP_Client_Connection *con);

void *tcp_con_custom_object(const TCP_Client_Connection *con);
uint32_t tcp_con_custom_uint(const TCP_Client_Connection *con);
void tcp_con_set_custom_object(TCP_Client_Connection *con, void *object);
Expand Down
32 changes: 32 additions & 0 deletions toxcore/TCP_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,38 @@ uint32_t tcp_connections_count(const TCP_Connections *tcp_c)
return tcp_c->tcp_connections_length;
}


/**
* Return number of elements of TCP connection array.
*
* @param tcp_c struct containing TCP_con array.
*
* @return number of elements of TCP connection array.
*/
uint32_t tcp_connections_length(const TCP_Connections *tcp_c)
{
return tcp_c->tcp_connections_length;
}


/**
* Return TCP connection stored at "idx" position.
*
* @param tcp_c struct containing TCP_con array.
* @param idx index of TCP connection to return (values from 0 to tcp_connections_length() - 1).
*
* @return TCP connection stored at "idx" position, or NULL if errors occurred.
*/
const TCP_con *tcp_connections_connection_at(const TCP_Connections *tcp_c, uint32_t idx)
{
if (idx >= tcp_c->tcp_connections_length) {
return nullptr;
}

return &tcp_c->tcp_connections[idx];
}


/** Set the size of the array to num.
*
* return -1 if realloc fails.
Expand Down
4 changes: 4 additions & 0 deletions toxcore/TCP_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ typedef struct TCP_Connections TCP_Connections;

const uint8_t *tcp_connections_public_key(const TCP_Connections *tcp_c);

uint32_t tcp_connections_length(const TCP_Connections *tcp_c);

const TCP_con *tcp_connections_connection_at(const TCP_Connections *tcp_c, uint32_t idx);

uint32_t tcp_connections_count(const TCP_Connections *tcp_c);

/** Returns the number of connected TCP relays */
Expand Down
Loading