diff --git a/common/c-api/util.h b/common/c-api/util.h index 06aeac15..dfcf601a 100644 --- a/common/c-api/util.h +++ b/common/c-api/util.h @@ -202,6 +202,7 @@ template static inline T &getReference(std::shared_ptr &t) { // T is anything that has a .size() method and which can be iterated over for // swss::KeyOpFieldValuesTuple, eg vector or deque template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(T &&in) { + SWSS_LOG_DEBUG("Entering makeKeyOpFieldValuesArray method"); SWSSKeyOpFieldValues *data = new SWSSKeyOpFieldValues[in.size()]; size_t i = 0; @@ -211,6 +212,7 @@ template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesA SWSSKeyOpFieldValuesArray out; out.len = (uint64_t)in.size(); out.data = data; + SWSS_LOG_DEBUG("2::: out.len value is: %ld", out.len); return out; } @@ -255,9 +257,11 @@ static inline std::vector takeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray in) { std::vector out; for (uint64_t i = 0; i < in.len; i++) { + SWSS_LOG_DEBUG("i value is: %ld", i); SWSSKeyOpFieldValues kfv = in.data[i]; out.push_back(takeKeyOpFieldValues(std::move(kfv))); } + SWSS_LOG_DEBUG("out.len value is: %ld", out.size()); return out; } diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp index fa1d59ca..e2c7636e 100644 --- a/common/c-api/zmqclient.cpp +++ b/common/c-api/zmqclient.cpp @@ -25,6 +25,7 @@ void SWSSZmqClient_connect(SWSSZmqClient zmqc) { void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, SWSSKeyOpFieldValuesArray arr) { + SWSS_LOG_DEBUG("Inside SWSSZmqClient_sendMsg"); SWSSTry({ vector kcos = takeKeyOpFieldValuesArray(arr); ((ZmqClient *)zmqc) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 5a84160e..5864a7ae 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -18,6 +18,7 @@ namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) :ZmqClient(endpoint, "") { + initialize(endpoint); } ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) @@ -25,6 +26,12 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) initialize(endpoint, vrf); } +ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) : + m_waitTimeMs(waitTimeMs) +{ + initialize(endpoint); +} + ZmqClient::~ZmqClient() { std::lock_guard lock(m_socketMutex); @@ -55,6 +62,17 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) connect(); } + +void ZmqClient::initialize(const std::string& endpoint) +{ + m_connected = false; + m_endpoint = endpoint; + m_context = nullptr; + m_socket = nullptr; + m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); + + connect(); +} bool ZmqClient::isConnected() { @@ -63,6 +81,7 @@ bool ZmqClient::isConnected() void ZmqClient::connect() { +SWSS_LOG_ERROR("DIV:: Inside function client connect"); if (m_connected) { SWSS_LOG_DEBUG("Already connected to endpoint: %s", m_endpoint.c_str()); @@ -88,6 +107,7 @@ void ZmqClient::connect() m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PUSH); + SWSS_LOG_DEBUG("m_socket in client connect() is: %p\n", m_socket); // timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt int linger = 0; zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger)); @@ -119,6 +139,7 @@ void ZmqClient::sendMsg( const std::string& tableName, const std::vector& kcos) { +SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); int serializedlen = (int)BinarySerializer::serializeBuffer( m_sendbuffer.data(), m_sendbuffer.size(), @@ -137,16 +158,18 @@ void ZmqClient::sendMsg( int zmq_err = 0; int retry_delay = 10; int rc = 0; - for (int i = 0; i <= MQ_MAX_RETRY; ++i) + for (int i = 0; i <= MQ_MAX_RETRY; ++i) { { // ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq std::lock_guard lock(m_socketMutex); // Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send - rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); + rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); } - +//SWSS_LOG_DEBUG("Before Sleep() in client sendmsg"); +// usleep(10 * 1000); +//SWSS_LOG_DEBUG("After Sleep() in client sendmsg"); if (rc >= 0) { SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen); @@ -164,7 +187,7 @@ void ZmqClient::sendMsg( // For example when ZMQ socket still not receive reply message from last sended package. // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this error will happen. // for more detail, please check: http://api.zeromq.org/2-1:zmq-send - SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + SWSS_LOG_WARN("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); retry_delay = 0; } @@ -183,7 +206,7 @@ void ZmqClient::sendMsg( else { // for other error, send failed immediately. - auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } @@ -192,9 +215,19 @@ void ZmqClient::sendMsg( } // failed after retry - auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } +bool ZmqClient::wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos) +{ +SWSS_LOG_ERROR("DIV:: Inside function wait"); + SWSS_LOG_ENTER(); + + kcos.clear(); + return false; +} } diff --git a/common/zmqclient.h b/common/zmqclient.h index adc36b05..c645a1f7 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -12,8 +12,10 @@ namespace swss { class ZmqClient { public: + ZmqClient(const std::string& endpoint); ZmqClient(const std::string& endpoint, const std::string& vrf); + ZmqClient(const std::string& endpoint, uint32_t waitTimeMs); ~ZmqClient(); bool isConnected(); @@ -23,8 +25,14 @@ class ZmqClient void sendMsg(const std::string& dbName, const std::string& tableName, const std::vector& kcos); + + bool wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos); + private: void initialize(const std::string& endpoint, const std::string& vrf); + void initialize(const std::string& endpoint); std::string m_endpoint; @@ -36,9 +44,11 @@ class ZmqClient bool m_connected; + uint32_t m_waitTimeMs; + std::mutex m_socketMutex; - - std::vector m_sendbuffer; + + std::vector m_sendbuffer; }; } diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 5f58482f..c751953e 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -41,6 +41,7 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string void ZmqConsumerStateTable::handleReceivedData(const std::vector> &kcos) { + SWSS_LOG_DEBUG("Entering ZmqConsumerStateTable::handleReceivedData"); for (auto kco : kcos) { std::shared_ptr clone = nullptr; @@ -53,6 +54,7 @@ void ZmqConsumerStateTable::handleReceivedData(const std::vector lock(m_receivedQueueMutex); m_receivedOperationQueue.push(kco); +SWSS_LOG_DEBUG("Called m_receivedOperationQueue.push in handleReceivedData()"); } if (m_asyncDBUpdater != nullptr) @@ -73,6 +75,7 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const // For new data append to m_dataQueue during pops, will not be include in result. count = m_receivedOperationQueue.size(); + SWSS_LOG_DEBUG("count value: %ld", count); if (!count) { return; @@ -82,6 +85,7 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const vkco.clear(); for (size_t ie = 0; ie < count; ie++) { + SWSS_LOG_DEBUG("count inside for loop: %ld", count); auto& kco = *(m_receivedOperationQueue.front()); vkco.push_back(std::move(kco)); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index e2a31446..31957432 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -148,6 +148,7 @@ void ZmqProducerStateTable::del(const std::vector &keys) void ZmqProducerStateTable::send(const std::vector &kcos) { +SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::send"); m_zmqClient.sendMsg( m_dbName, m_tableNameStr, @@ -164,6 +165,15 @@ void ZmqProducerStateTable::send(const std::vector &kcos } } +bool ZmqProducerStateTable::wait(std::string& dbName, + std::string& tableName, + std::vector>& kcos) +{ +SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::wait"); + + return m_zmqClient.wait(dbName, tableName, kcos); +} + size_t ZmqProducerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 015419bd..184d6272 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -18,6 +18,7 @@ class ZmqProducerStateTable : public ProducerStateTable public: ZmqProducerStateTable(DBConnector *db, const std::string &tableName, ZmqClient &zmqClient, bool dbPersistence = true); ZmqProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, ZmqClient &zmqClient, bool buffered = false, bool dbPersistence = true); +// ~ZmqProducerStateTable() = default; /* Implements set() and del() commands using notification messages */ virtual void set(const std::string &key, @@ -37,6 +38,14 @@ class ZmqProducerStateTable : public ProducerStateTable // Batched send that can include both SET and DEL requests. virtual void send(const std::vector &kcos); + // This method should only be used if the ZmqClient enables one-to-one sync. + + virtual bool wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos); + size_t dbUpdaterQueueSize(); private: void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index dca10740..bffba88d 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -16,10 +17,23 @@ ZmqServer::ZmqServer(const std::string& endpoint) { } +ZmqServer::ZmqServer(const std::string& endpoint, bool zmr_test) +: m_endpoint(endpoint), +m_allowZmqPoll(true) +{ + connect(); + m_buffer.resize(MQ_RESPONSE_MAX_COUNT); + m_mqPollThread = std::make_shared(&ZmqServer::mqPollThread, this); + m_runThread = true; + + SWSS_LOG_DEBUG("DIV: ZmqServer ctor endpoint: %s", endpoint.c_str()); +} + ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) : m_endpoint(endpoint), m_vrf(vrf) { + connect(); m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; m_mqPollThread = std::make_shared(&ZmqServer::mqPollThread, this); @@ -29,8 +43,38 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) ZmqServer::~ZmqServer() { + m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); + + zmq_close(m_socket); + zmq_ctx_destroy(m_context); +} + +void ZmqServer::connect() +{ +SWSS_LOG_ERROR("DIV:: Inside function server connect"); + SWSS_LOG_ENTER(); + m_context = zmq_ctx_new(); + m_socket = zmq_socket(m_context, ZMQ_PULL); + + SWSS_LOG_DEBUG("m_socket in server connect() is: %p\n", m_socket); + // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt + int high_watermark = MQ_WATERMARK; + zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + + if (!m_vrf.empty()) + { + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + + int rc = zmq_bind(m_socket, m_endpoint.c_str()); + if (rc != 0) + { + SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", + m_endpoint.c_str(), + zmq_errno()); + } } void ZmqServer::registerMessageHandler( @@ -38,6 +82,7 @@ void ZmqServer::registerMessageHandler( const std::string tableName, ZmqMessageHandler* handler) { +SWSS_LOG_ERROR("DIV:: Inside function registerMessageHandler"); auto dbResult = m_HandlerMap.insert(pair>(dbName, map())); if (dbResult.second) { SWSS_LOG_DEBUG("ZmqServer add handler mapping for db: %s", dbName.c_str()); @@ -53,6 +98,7 @@ ZmqMessageHandler* ZmqServer::findMessageHandler( const std::string dbName, const std::string tableName) { +SWSS_LOG_ERROR("DIV:: Inside function findMessageHandler"); auto dbMappingIter = m_HandlerMap.find(dbName); if (dbMappingIter == m_HandlerMap.end()) { SWSS_LOG_DEBUG("ZmqServer can't find any handler for db: %s", dbName.c_str()); @@ -70,6 +116,7 @@ ZmqMessageHandler* ZmqServer::findMessageHandler( void ZmqServer::handleReceivedData(const char* buffer, const size_t size) { +SWSS_LOG_ERROR("DIV:: Inside function handleReceivedData"); std::string dbName; std::string tableName; std::vector> kcos; @@ -87,43 +134,26 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size) void ZmqServer::mqPollThread() { +SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("mqPollThread begin"); - // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket - void* context = zmq_ctx_new();; - void* socket = zmq_socket(context, ZMQ_PULL); - - // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt - int high_watermark = MQ_WATERMARK; - zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); - - if (!m_vrf.empty()) - { - zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); - } - - int rc = zmq_bind(socket, m_endpoint.c_str()); - if (rc != 0) - { - SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", - m_endpoint.c_str(), - zmq_errno(), - strerror(zmq_errno())); - } - // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = socket; + poll_item.socket = m_socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str()); + SWSS_LOG_DEBUG("m_runThread: %d", m_runThread); while (m_runThread) { + m_allowZmqPoll = false; + // receive message - rc = zmq_poll(&poll_item, 1, 1000); + auto rc = zmq_poll(&poll_item, 1, 1000); + SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) { // timeout or other event @@ -132,7 +162,10 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("m_socket in mqPollThread() server is: %p\n", m_socket); + + rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc); if (rc < 0) { int zmq_err = zmq_errno(); @@ -159,12 +192,85 @@ void ZmqServer::mqPollThread() // deserialize and write to redis: handleReceivedData(m_buffer.data(), rc); +// SWSS_LOG_DEBUG("Before Sleep() in mqPollThread"); +// usleep(10); } + SWSS_LOG_NOTICE("mqPollThread end"); +} - zmq_close(socket); - zmq_ctx_destroy(context); +void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values) +{ - SWSS_LOG_NOTICE("mqPollThread end"); + return; +SWSS_LOG_ERROR("DIV:: Inside function server sendMsg"); + int serializedlen = (int)BinarySerializer::serializeBuffer( + m_buffer.data(), + m_buffer.size(), + dbName, + tableName, + values); + SWSS_LOG_DEBUG("sending: %d", serializedlen); + int zmq_err = 0; + int retry_delay = 10; + int rc = 0; + for (int i = 0; i <= MQ_MAX_RETRY; ++i) + { + SWSS_LOG_DEBUG("1. m_socket in server sendmsg() is: %p\n", m_socket); + rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); + SWSS_LOG_DEBUG("ser: rc value is : %d", rc); + if (rc >= 0) + { + m_allowZmqPoll = true; + SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen); + return; + } + zmq_err = zmq_errno(); + // sleep (2 ^ retry time) * 10 ms + retry_delay *= 2; + SWSS_LOG_DEBUG("2. m_socket in server sendmsg() is: %p\n", m_socket); + SWSS_LOG_DEBUG("zmq_err is : %d", zmq_err); + + if (zmq_err == EINTR + || zmq_err == EFSM) + { + // EINTR: interrupted by signal + // EFSM: socket state not ready + // For example when ZMQ socket still not receive reply message from last sended package. + // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this + // error will happen. + // for more detail, please check: http://api.zeromq.org/2-1:zmq-send + SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + + retry_delay = 0; + } + else if (zmq_err == EAGAIN) + { + // EAGAIN: ZMQ is full to need try again + SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err); + } + else if (zmq_err == ETERM) + { + auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::connection_reset), message); + } + else + { + // for other error, send failed immediately. + auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + SWSS_LOG_DEBUG("3. m_socket in server sendmsg() is: %p\n", m_socket); + throw system_error(make_error_code(errc::io_error), message); + return; + } + usleep(retry_delay * 1000); + } + + // failed after retry + auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::io_error), message); } } diff --git a/common/zmqserver.h b/common/zmqserver.h index 8afe18d7..e24dcc01 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -31,6 +31,7 @@ class ZmqServer static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); + ZmqServer(const std::string& endpoint, bool zmr_test); ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); @@ -39,7 +40,13 @@ class ZmqServer const std::string tableName, ZmqMessageHandler* handler); + void sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values); + private: + + void connect(); + void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); @@ -56,6 +63,12 @@ class ZmqServer std::string m_vrf; + void* m_context; + + void* m_socket; + + bool m_allowZmqPoll; + std::map> m_HandlerMap; }; diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index b3d015e0..7febb0a7 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -330,10 +330,14 @@ T castSelectableObj(swss::Selectable *temp) %apply std::string& OUTPUT {std::string &key}; %apply std::string& OUTPUT {std::string &op}; %apply std::vector>& OUTPUT {std::vector> &fvs}; +%apply std::string& OUTPUT {std::string &dbName}; +%apply std::string& OUTPUT {std::string &tableName}; %include "consumertablebase.h" %clear std::string &key; %clear std::string &op; %clear std::vector> &fvs; +%clear std::string &dbName; +%clear std::string &tableName; %include "consumertable.h" %include "consumerstatetable.h" diff --git a/tests/c_api_ut.cpp b/tests/c_api_ut.cpp index ed814607..b656b877 100644 --- a/tests/c_api_ut.cpp +++ b/tests/c_api_ut.cpp @@ -221,9 +221,7 @@ TEST(c_api, SubscriberStateTable) { TEST(c_api, ZmqConsumerProducerStateTable) { clearDB(); SWSSStringManager sm; - SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); - SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312"); SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312"); EXPECT_TRUE(SWSSZmqClient_isConnected(cli)); @@ -241,9 +239,11 @@ TEST(c_api, ZmqConsumerProducerStateTable) { ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); +SWSS_LOG_DEBUG("print7"); // On flag = 0, we use the ZmqProducerStateTable // On flag = 1, we use the ZmqClient directly for (int flag = 0; flag < 2; flag++) { +SWSS_LOG_DEBUG("print7 for loop, flag set is : %d", flag); SWSSFieldValueTuple values_key1_data[2] = {{.field = "myfield1", .value = sm.makeString("myvalue1")}, {.field = "myfield2", .value = sm.makeString("myvalue2")}}; SWSSFieldValueArray values_key1 = { @@ -251,24 +251,58 @@ TEST(c_api, ZmqConsumerProducerStateTable) { .data = values_key1_data, }; +SWSS_LOG_DEBUG("print8"); SWSSFieldValueTuple values_key2_data[1] = {{.field = "myfield3", .value = sm.makeString("myvalue3")}}; SWSSFieldValueArray values_key2 = { .len = 1, .data = values_key2_data, }; +SWSS_LOG_DEBUG("print9"); SWSSKeyOpFieldValues arr_data[2] = { {.key = "mykey1", .operation = SWSSKeyOperation_SET, .fieldValues = values_key1}, {.key = "mykey2", .operation = SWSSKeyOperation_SET, .fieldValues = values_key2}}; arr = {.len = 2, .data = arr_data}; +SWSS_LOG_DEBUG("print10"); if (flag == 0) for (uint64_t i = 0; i < arr.len; i++) + { + SWSS_LOG_DEBUG("flag 0 case before calling SWSSZmqProducerStateTable_set, i: %ld", i); SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues); + } else + { + SWSS_LOG_DEBUG("print10 else loop, flag is: %d", flag); SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); + } +SWSS_LOG_DEBUG("print11"); ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA); +/* int retry_cnt = 1; + vector kfvs; + while (true) + { + arr = SWSSZmqConsumerStateTable_pops(cst); + + SWSS_LOG_DEBUG("print12"); + kfvs = takeKeyOpFieldValuesArray(arr); + SWSS_LOG_DEBUG("1 : kfvs.size() is: %ld", kfvs.size()); + sortKfvs(kfvs); + + SWSS_LOG_DEBUG("2 : kfvs.size() is: %ld", kfvs.size()); + SWSS_LOG_DEBUG("print13"); + if(kfvs.size() == 2 || retry_cnt == 3) + break; + retry_cnt++; + SWSS_LOG_DEBUG("Retry count is: %d, Before sleep()", retry_cnt); + usleep(1 * 1000); + SWSS_LOG_DEBUG("Retry count is: %d, After sleep()", retry_cnt); + } */ + + SWSS_LOG_DEBUG("Before sleep(2)"); + sleep(2); + SWSS_LOG_DEBUG("After sleep(2)"); arr = SWSSZmqConsumerStateTable_pops(cst); vector kfvs = takeKeyOpFieldValuesArray(arr); @@ -285,6 +319,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) { EXPECT_EQ(fieldValues0[1].first, "myfield2"); EXPECT_EQ(fieldValues0[1].second, "myvalue2"); +SWSS_LOG_DEBUG("print14"); EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); EXPECT_EQ(kfvOp(kfvs[1]), "SET"); vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); @@ -306,13 +341,40 @@ TEST(c_api, ZmqConsumerProducerStateTable) { else SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); +SWSS_LOG_DEBUG("print15"); ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA); + +/* retry_cnt = 1; + while (true) + { + arr = SWSSZmqConsumerStateTable_pops(cst); + + kfvs = takeKeyOpFieldValuesArray(arr); + SWSS_LOG_DEBUG("3 : kfvs.size() is: %ld", kfvs.size()); + sortKfvs(kfvs); + SWSS_LOG_DEBUG("4 : kfvs.size() is: %ld", kfvs.size()); + if(kfvs.size() == 2 || retry_cnt == 3) + break; + retry_cnt++; + SWSS_LOG_DEBUG("Retry count is: %d, Before sleep()", retry_cnt); + usleep(1 * 1000); + SWSS_LOG_DEBUG("Retry count is: %d, After sleep()", retry_cnt); + } */ + + SWSS_LOG_DEBUG("Before sleep(2)"); + sleep(2); + SWSS_LOG_DEBUG("After sleep(2)"); arr = SWSSZmqConsumerStateTable_pops(cst); +SWSS_LOG_DEBUG("5 : kfvs.size() is: %ld", kfvs.size()); +SWSS_LOG_DEBUG("print16"); kfvs = takeKeyOpFieldValuesArray(arr); +SWSS_LOG_DEBUG("6 : kfvs.size() is: %ld", kfvs.size()); sortKfvs(kfvs); freeKeyOpFieldValuesArray(arr); +SWSS_LOG_DEBUG("7 : kfvs.size() is: %ld", kfvs.size()); +SWSS_LOG_DEBUG("print17"); ASSERT_EQ(kfvs.size(), 2); EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 56a8299f..bd056f48 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "gtest/gtest.h" #include "common/dbconnector.h" #include "common/notificationconsumer.h" @@ -14,6 +15,7 @@ #include "common/zmqclient.h" #include "common/zmqproducerstatetable.h" #include "common/zmqconsumerstatetable.h" +#include "common/binaryserializer.h" using namespace std; using namespace swss; @@ -56,8 +58,11 @@ static bool allDataReceived = false; static void producerWorker(string tableName, string endpoint, bool dbPersistence) { +SWSS_LOG_DEBUG("Inside producerWorker"); DBConnector db(TEST_DB, 0, true); +SWSS_LOG_DEBUG("producerWorker: After DBConnector"); ZmqClient client(endpoint); +SWSS_LOG_DEBUG("producerWorker: After zmqclient"); ZmqProducerStateTable p(&db, tableName, client, dbPersistence); cout << "Producer thread started: " << tableName << endl; @@ -257,6 +262,10 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence } } + // Wait for some time to write into the DB. + + sleep(3); + allDataReceived = true; if (dbPersistence) @@ -288,6 +297,9 @@ static void testMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -351,6 +363,9 @@ static void testBatchMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -465,3 +480,115 @@ TEST(ZmqProducerStateTableDeleteAfterSend, test) table.getKeys(keys); EXPECT_EQ(keys.front(), testKey); } + +static bool zmq_done = false; + +static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence) +{ +// std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + + cout << "DIV:: Function zmqConsumerWorker 473" << endl; + cout << "Consumer thread started: " << tableName << endl; + DBConnector db(TEST_DB, 0, true); + cout << "DIV:: Function zmqConsumerWorker 476" << endl; + ZmqServer server(endpoint, true); + cout << "DIV:: Function zmqConsumerWorker 478" << endl; + ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); + cout << "DIV:: Function zmqConsumerWorker 480" << endl; + //validate received data + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + + while (!zmq_done) + { + sleep(10); + std::string rec_dbName, rec_tableName; + std::vector> rec_kcos_ptrs; + std::vector deserialized_kcos; + + BinarySerializer::deserializeBuffer(server.m_buffer.data(), server.m_buffer.size(), rec_dbName, rec_tableName, rec_kcos_ptrs); + + for (auto kco_ptr : rec_kcos_ptrs) + { + deserialized_kcos.push_back(*kco_ptr); + } + SWSS_LOG_DEBUG("dbname is: %s & tablename is : %s", rec_dbName.c_str(), rec_tableName.c_str()); + EXPECT_EQ(rec_dbName, TEST_DB); + EXPECT_EQ(rec_tableName, tableName); + EXPECT_EQ(deserialized_kcos, values); + + } + + allDataReceived = true; + if (dbPersistence) + { + cout << "DIV:: Function zmqConsumerWorker 509" << endl; + // wait all persist data write to redis + while (c.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + } + + zmq_done = true; + cout << "Consumer thread ended: " << tableName << endl; +} + +static void ZmqWithResponse(bool producerPersistence) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + // start consumer first, SHM can only have 1 consumer per table. + thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence); + + cout << "DIV:: Function ZmqWithResponse ut 1 529" << endl; + // Wait for the consumer to be ready. + sleep(1); + DBConnector db(TEST_DB, 0, true); + cout << "DIV:: Function ZmqWithResponse ut 1 533" << endl; + ZmqClient client(pushEndpoint, 3000); + cout << "DIV:: Function ZmqWithResponse ut 1 535" << endl; + ZmqProducerStateTable p(&db, testTableName, client, true); + cout << "DIV:: Function ZmqWithResponse ut 1 537" << endl; + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + //std::vector> kcos_p; + cout << "DIV:: Function ZmqWithResponse ut 1 541" << endl; + //std::string dbName, tableName; + for (int i = 0; i < 5; ++i) + { + cout << "DIV:: Function ZmqWithResponse ut 1 545" << endl; + p.send(kcos); + } + + cout << "DIV:: Function ZmqWithResponse ut 1 558" << endl; + zmq_done = true; + sleep(10); + consumerThread->join(); + delete consumerThread; +} + +TEST(ZmqWithResponse, test) +{ + // test with persist by consumer + ZmqWithResponse(false); +} + +TEST(ZmqWithResponseClientError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client, true); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); + std::vector> kcos_p; + std::string dbName, tableName; + p.send(kcos); + // Wait will timeout without server reply. + EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); +} diff --git a/ut_dump_file.txt b/ut_dump_file.txt new file mode 100644 index 00000000..68146e2d --- /dev/null +++ b/ut_dump_file.txt @@ -0,0 +1,19 @@ +[ +{ + "OP": "SET", + "UT_REDIS:test_key_1": { + "test_field_1": "test_value_1" + } +}, +{ + "OP": "SET", + "UT_REDIS:test_key_2": { + "test_field_1": "test_value_1", + "test_field_2": "test_value_2" + } +}, +{ + "OP": "DEL", + "UT_REDIS:test_key_1": {} +} +]