Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq
  • Loading branch information
divyagayathri-hcl committed Dec 7, 2024
2 parents bbbe998 + fb6ce44 commit 732b704
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
3 changes: 2 additions & 1 deletion common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,12 @@ namespace swss {

#define CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME "SUPPRESS_ASIC_SDK_HEALTH_EVENT"

#define CFG_MEMORY_STATISTICS_TABLE_NAME "MEMORY_STATISTICS"

#define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE"
#define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE"
#define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE"


/***** STATE DATABASE *****/

#define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY"
Expand Down
27 changes: 17 additions & 10 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void ZmqClient::sendMsg(
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
Expand All @@ -211,7 +211,7 @@ void ZmqClient::sendMsg(
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}
Expand All @@ -224,16 +224,22 @@ bool ZmqClient::wait(std::string& dbName,

/* zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;
items[0].events = ZMQ_POLLIN; */

zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

int rc;
for (int i = 0; true; ++i)
{
SWSS_LOG_DEBUG("m_waitTimeMs is : %d", (int)m_waitTimeMs);
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
rc = zmq_poll(&poll_item, 1, 1000);
SWSS_LOG_DEBUG("cli: rc value is : %d", rc);
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out");
SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait");
return false;
}
if (rc > 0)
Expand All @@ -242,22 +248,23 @@ bool ZmqClient::wait(std::string& dbName,
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll");
continue;
}
SWSS_LOG_ERROR("zmq_poll failed, zmqerrno: %d", zmq_errno());
} */
SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno());
}

int rc;
for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive");
continue;
}
SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno());
SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno());
return false;
}
if (rc >= (int)m_sendbuffer.size())
Expand Down
2 changes: 1 addition & 1 deletion common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ZmqClient

private:
void initialize(const std::string& endpoint, const std::string& vrf);
// void initialize(const std::string& endpoint);
void initialize(const std::string& endpoint);

std::string m_endpoint;

Expand Down
18 changes: 16 additions & 2 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <unistd.h>
#include <string>
#include <deque>
#include <limits>
Expand Down Expand Up @@ -96,6 +97,11 @@ void ZmqServer::mqPollThread()
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

//divya
int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code);
//divya

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
Expand All @@ -106,6 +112,7 @@ void ZmqServer::mqPollThread()
}

int rc = zmq_bind(socket, m_endpoint.c_str());
SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc);
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
Expand All @@ -122,12 +129,14 @@ void ZmqServer::mqPollThread()
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
SWSS_LOG_DEBUG("m_runThread: %d", m_runThread);
while (m_runThread)
{
m_allowZmqPoll = false;

// receive message
rc = zmq_poll(&poll_item, 1, 1000);
SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -137,6 +146,7 @@ void ZmqServer::mqPollThread()

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -187,7 +197,7 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);
SWSS_LOG_DEBUG("rc value is : %d", rc);
SWSS_LOG_DEBUG("ser: rc value is : %d", rc);
if (rc >= 0)
{
m_allowZmqPoll = true;
Expand Down Expand Up @@ -229,14 +239,18 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
// throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Else case message is: %s", message.c_str());
return;
}
usleep(retry_delay * 1000);
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
// throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Last Error message is %s", message.c_str());
return;
}

}

0 comments on commit 732b704

Please sign in to comment.