diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp index 49a9e05f..fa1d59ca 100644 --- a/common/c-api/zmqclient.cpp +++ b/common/c-api/zmqclient.cpp @@ -27,9 +27,7 @@ void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *t SWSSKeyOpFieldValuesArray arr) { SWSSTry({ vector kcos = takeKeyOpFieldValuesArray(arr); - size_t bufSize = BinarySerializer::serializedSize(dbName, tableName, kcos); - vector v(bufSize); ((ZmqClient *)zmqc) - ->sendMsg(string(dbName), string(tableName), kcos, v); + ->sendMsg(string(dbName), string(tableName), kcos); }); } diff --git a/common/schema.h b/common/schema.h index 108fbc8d..e99f2ad8 100644 --- a/common/schema.h +++ b/common/schema.h @@ -429,7 +429,8 @@ namespace swss { #define CFG_MCLAG_UNIQUE_IP_TABLE_NAME "MCLAG_UNIQUE_IP" #define CFG_PORT_STORM_CONTROL_TABLE_NAME "PORT_STORM_CONTROL" - +#define CFG_VRRP_TABLE_NAME "VRRP" +#define CFG_VRRP6_TABLE_NAME "VRRP6" #define CFG_RATES_TABLE_NAME "RATES" #define CFG_FEATURE_TABLE_NAME "FEATURE" diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 3ed5bcf7..8d61d1cd 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -51,6 +51,7 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) m_context = nullptr; m_socket = nullptr; m_vrf = vrf; + m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); connect(); } @@ -116,12 +117,11 @@ void ZmqClient::connect() void ZmqClient::sendMsg( const std::string& dbName, const std::string& tableName, - const std::vector& kcos, - std::vector& sendbuffer) + const std::vector& kcos) { int serializedlen = (int)BinarySerializer::serializeBuffer( - sendbuffer.data(), - sendbuffer.size(), + m_sendbuffer.data(), + m_sendbuffer.size(), dbName, tableName, kcos); @@ -144,7 +144,7 @@ void ZmqClient::sendMsg( std::lock_guard lock(m_socketMutex); // Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send - rc = zmq_send(m_socket, sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); + rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); } if (rc >= 0) @@ -201,10 +201,7 @@ bool ZmqClient::wait(std::string& dbName, std::string& tableName, - std::vector>& kcos, - - std::vector& buffer) - + std::vector>& kcos) { SWSS_LOG_ENTER(); @@ -215,7 +212,7 @@ bool ZmqClient::wait(std::string& dbName, { - rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0); + rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0); if (rc < 0) @@ -233,7 +230,7 @@ bool ZmqClient::wait(std::string& dbName, } - if (rc >= (int)buffer.size()) + if (rc >= (int)m_sendbuffer.size()) { @@ -241,7 +238,7 @@ bool ZmqClient::wait(std::string& dbName, "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - (int)buffer.size(), rc); + (int)m_sendbuffer.size(), rc); } @@ -249,11 +246,11 @@ bool ZmqClient::wait(std::string& dbName, } - buffer.at(rc) = 0; // make sure that we end string with zero before parse + m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse kcos.clear(); - BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos); + BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos); return true; diff --git a/common/zmqclient.h b/common/zmqclient.h index 79b4d766..54f10bac 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -23,16 +23,13 @@ class ZmqClient void sendMsg(const std::string& dbName, const std::string& tableName, - const std::vector& kcos, - std::vector& sendbuffer); + const std::vector& kcos); bool wait(std::string& dbName, std::string& tableName, - std::vector>& kcos, - - std::vector& buffer); + std::vector>& kcos); private: void initialize(const std::string& endpoint, const std::string& vrf); @@ -48,6 +45,8 @@ class ZmqClient bool m_connected; std::mutex m_socketMutex; + + std::vector m_sendbuffer; }; } diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index c171163f..bc32a765 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -38,8 +38,6 @@ ZmqProducerStateTable::ZmqProducerStateTable(RedisPipeline *pipeline, const stri void ZmqProducerStateTable::initialize(DBConnector *db, const std::string &tableName, bool dbPersistence) { - m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); - if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); @@ -64,8 +62,7 @@ void ZmqProducerStateTable::set( m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -93,8 +90,7 @@ void ZmqProducerStateTable::del( m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -112,8 +108,7 @@ void ZmqProducerStateTable::set(const std::vector &value m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - values, - m_sendbuffer); + values); if (m_asyncDBUpdater != nullptr) { @@ -136,8 +131,7 @@ void ZmqProducerStateTable::del(const std::vector &keys) m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -157,8 +151,7 @@ void ZmqProducerStateTable::send(const std::vector &kcos m_zmqClient.sendMsg( m_dbName, m_tableNameStr, - kcos, - m_sendbuffer); + kcos); if (m_asyncDBUpdater != nullptr) { @@ -179,7 +172,7 @@ bool ZmqProducerStateTable::wait(std::string& dbName, { - return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); + return m_zmqClient.wait(dbName, tableName, kcos); } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 3c794237..8d21138e 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -50,8 +50,6 @@ class ZmqProducerStateTable : public ProducerStateTable void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); ZmqClient& m_zmqClient; - - std::vector m_sendbuffer; const std::string m_dbName; const std::string m_tableNameStr; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 2f510577..ab7cc43e 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -33,11 +33,6 @@ ZmqServer::~ZmqServer() m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); - - zmq_close(m_socket); - - zmq_ctx_destroy(m_context); - } void ZmqServer::registerMessageHandler( @@ -99,18 +94,18 @@ void ZmqServer::mqPollThread() // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket void* context = zmq_ctx_new();; - void* m_socket = zmq_socket(context, ZMQ_PULL); + void* socket = zmq_socket(context, ZMQ_PULL); // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; - zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); if (!m_vrf.empty()) { - zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); } - int rc = zmq_bind(m_socket, m_endpoint.c_str()); + int rc = zmq_bind(socket, m_endpoint.c_str()); if (rc != 0) { SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", @@ -122,7 +117,7 @@ void ZmqServer::mqPollThread() // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = m_socket; + poll_item.socket = socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; @@ -141,7 +136,7 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); if (rc < 0) { int zmq_err = zmq_errno(); @@ -170,8 +165,8 @@ void ZmqServer::mqPollThread() handleReceivedData(m_buffer.data(), rc); } - zmq_close(m_socket); - zmq_ctx_destroy(m_context); + zmq_close(socket); + zmq_ctx_destroy(context); SWSS_LOG_NOTICE("mqPollThread end"); }