Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq
  • Loading branch information
divyagayathri-hcl committed Dec 7, 2024
2 parents bbbe998 + fb6ce44 commit b337cba
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
3 changes: 2 additions & 1 deletion common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,12 @@ namespace swss {

#define CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME "SUPPRESS_ASIC_SDK_HEALTH_EVENT"

#define CFG_MEMORY_STATISTICS_TABLE_NAME "MEMORY_STATISTICS"

#define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE"
#define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE"
#define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE"


/***** STATE DATABASE *****/

#define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY"
Expand Down
24 changes: 15 additions & 9 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,40 +224,46 @@ bool ZmqClient::wait(std::string& dbName,

/* zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;
items[0].events = ZMQ_POLLIN; */

zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

int rc;
for (int i = 0; true; ++i)
{
SWSS_LOG_DEBUG("m_waitTimeMs is : %d", (int)m_waitTimeMs);
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out");
return false;
SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait");
// return false;
}
if (rc > 0)
{
break;
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll");
continue;
}
SWSS_LOG_ERROR("zmq_poll failed, zmqerrno: %d", zmq_errno());
} */
SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno());
}

int rc;
for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive");
continue;
}
SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno());
SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno());
return false;
}
if (rc >= (int)m_sendbuffer.size())
Expand Down
2 changes: 1 addition & 1 deletion common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ZmqClient

private:
void initialize(const std::string& endpoint, const std::string& vrf);
// void initialize(const std::string& endpoint);
void initialize(const std::string& endpoint);

std::string m_endpoint;

Expand Down
12 changes: 11 additions & 1 deletion common/zmqserver.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <unistd.h>
#include <string>
#include <deque>
#include <limits>
Expand Down Expand Up @@ -96,6 +97,11 @@ void ZmqServer::mqPollThread()
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

//divya
int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code);
//divya

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
Expand Down Expand Up @@ -229,14 +235,18 @@ void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
// throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Else case message is: %s", message.c_str());
return;
}
usleep(retry_delay * 1000);
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
// throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Last Error message is %s", message.c_str());
return;
}

}

0 comments on commit b337cba

Please sign in to comment.