Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq_new
  • Loading branch information
divyagayathri-hcl committed Nov 25, 2024
2 parents 661b806 + 6bac82b commit c2f9379
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 51 deletions.
4 changes: 1 addition & 3 deletions common/c-api/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *t
SWSSKeyOpFieldValuesArray arr) {
SWSSTry({
vector<KeyOpFieldsValuesTuple> kcos = takeKeyOpFieldValuesArray(arr);
size_t bufSize = BinarySerializer::serializedSize(dbName, tableName, kcos);
vector<char> v(bufSize);
((ZmqClient *)zmqc)
->sendMsg(string(dbName), string(tableName), kcos, v);
->sendMsg(string(dbName), string(tableName), kcos);
});
}
3 changes: 2 additions & 1 deletion common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ namespace swss {
#define CFG_MCLAG_UNIQUE_IP_TABLE_NAME "MCLAG_UNIQUE_IP"

#define CFG_PORT_STORM_CONTROL_TABLE_NAME "PORT_STORM_CONTROL"

#define CFG_VRRP_TABLE_NAME "VRRP"
#define CFG_VRRP6_TABLE_NAME "VRRP6"
#define CFG_RATES_TABLE_NAME "RATES"

#define CFG_FEATURE_TABLE_NAME "FEATURE"
Expand Down
25 changes: 11 additions & 14 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)
m_context = nullptr;
m_socket = nullptr;
m_vrf = vrf;
m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT);

connect();
}
Expand Down Expand Up @@ -116,12 +117,11 @@ void ZmqClient::connect()
void ZmqClient::sendMsg(
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer)
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
sendbuffer.data(),
sendbuffer.size(),
m_sendbuffer.data(),
m_sendbuffer.size(),
dbName,
tableName,
kcos);
Expand All @@ -144,7 +144,7 @@ void ZmqClient::sendMsg(
std::lock_guard<std::mutex> lock(m_socketMutex);

// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
}

if (rc >= 0)
Expand Down Expand Up @@ -201,10 +201,7 @@ bool ZmqClient::wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,

std::vector<char>& buffer)

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{

SWSS_LOG_ENTER();
Expand All @@ -215,7 +212,7 @@ bool ZmqClient::wait(std::string& dbName,

{

rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0);
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);

if (rc < 0)

Expand All @@ -233,27 +230,27 @@ bool ZmqClient::wait(std::string& dbName,

}

if (rc >= (int)buffer.size())
if (rc >= (int)m_sendbuffer.size())

{

SWSS_LOG_THROW(

"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",

(int)buffer.size(), rc);
(int)m_sendbuffer.size(), rc);

}

break;

}

buffer.at(rc) = 0; // make sure that we end string with zero before parse
m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse

kcos.clear();

BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos);
BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos);

return true;

Expand Down
9 changes: 4 additions & 5 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ class ZmqClient

void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
const std::vector<KeyOpFieldsValuesTuple>& kcos);

bool wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,

std::vector<char>& buffer);
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(const std::string& endpoint, const std::string& vrf);
Expand All @@ -48,6 +45,8 @@ class ZmqClient
bool m_connected;

std::mutex m_socketMutex;

std::vector<char> m_sendbuffer;
};

}
19 changes: 6 additions & 13 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const stri

void ZmqProducerStateTable::initialize(DBConnector *db, const std::string &tableName, bool dbPersistence)
{
m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT);

if (dbPersistence)
{
SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str());
Expand All @@ -64,8 +62,7 @@ void ZmqProducerStateTable::set(
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
kcos);

if (m_asyncDBUpdater != nullptr)
{
Expand Down Expand Up @@ -93,8 +90,7 @@ void ZmqProducerStateTable::del(
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
kcos);

if (m_asyncDBUpdater != nullptr)
{
Expand All @@ -112,8 +108,7 @@ void ZmqProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple> &value
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
values,
m_sendbuffer);
values);

if (m_asyncDBUpdater != nullptr)
{
Expand All @@ -136,8 +131,7 @@ void ZmqProducerStateTable::del(const std::vector<std::string> &keys)
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
kcos);

if (m_asyncDBUpdater != nullptr)
{
Expand All @@ -157,8 +151,7 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
kcos);

if (m_asyncDBUpdater != nullptr)
{
Expand All @@ -179,7 +172,7 @@ bool ZmqProducerStateTable::wait(std::string& dbName,

{

return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer);
return m_zmqClient.wait(dbName, tableName, kcos);

}

Expand Down
2 changes: 0 additions & 2 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class ZmqProducerStateTable : public ProducerStateTable
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);

ZmqClient& m_zmqClient;

std::vector<char> m_sendbuffer;

const std::string m_dbName;
const std::string m_tableNameStr;
Expand Down
21 changes: 8 additions & 13 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ ZmqServer::~ZmqServer()
m_allowZmqPoll = true;
m_runThread = false;
m_mqPollThread->join();

zmq_close(m_socket);

zmq_ctx_destroy(m_context);

}

void ZmqServer::registerMessageHandler(
Expand Down Expand Up @@ -99,18 +94,18 @@ void ZmqServer::mqPollThread()

// Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* m_socket = zmq_socket(context, ZMQ_PULL);
void* socket = zmq_socket(context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(m_socket, m_endpoint.c_str());
int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
Expand All @@ -122,7 +117,7 @@ void ZmqServer::mqPollThread()
// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = m_socket;
poll_item.socket = socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

Expand All @@ -141,7 +136,7 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -170,8 +165,8 @@ void ZmqServer::mqPollThread()
handleReceivedData(m_buffer.data(), rc);
}

zmq_close(m_socket);
zmq_ctx_destroy(m_context);
zmq_close(socket);
zmq_ctx_destroy(context);

SWSS_LOG_NOTICE("mqPollThread end");
}
Expand Down

0 comments on commit c2f9379

Please sign in to comment.