diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp index fa1d59ca..e2c7636e 100644 --- a/common/c-api/zmqclient.cpp +++ b/common/c-api/zmqclient.cpp @@ -25,6 +25,7 @@ void SWSSZmqClient_connect(SWSSZmqClient zmqc) { void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, SWSSKeyOpFieldValuesArray arr) { + SWSS_LOG_DEBUG("Inside SWSSZmqClient_sendMsg"); SWSSTry({ vector kcos = takeKeyOpFieldValuesArray(arr); ((ZmqClient *)zmqc) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index b146b288..d13246f0 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -18,7 +18,7 @@ namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) :ZmqClient(endpoint, "") { -// initialize(endpoint); + initialize(endpoint); } ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) @@ -29,7 +29,6 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) : m_waitTimeMs(waitTimeMs) { -// m_waitTimeMs = waitTimeMs; initialize(endpoint); } @@ -226,68 +225,7 @@ bool ZmqClient::wait(std::string& dbName, SWSS_LOG_ERROR("DIV:: Inside function wait"); SWSS_LOG_ENTER(); -// return false; - - zmq_pollitem_t items [1] = { }; - items[0].socket = m_socket; - items[0].events = ZMQ_POLLIN; - -/* zmq_pollitem_t poll_item; - poll_item.fd = 0; - poll_item.socket = m_socket; - poll_item.events = ZMQ_POLLIN; - poll_item.revents = 0;*/ - - int rc; - for (int i = 0; true; ++i) - { -// rc = zmq_poll(&poll_item, 1, 1000); - rc = zmq_poll(items, 1, (int)m_waitTimeMs); - SWSS_LOG_DEBUG("cli: rc value is : %d", rc); - if (rc == 0) - { - SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait"); - return false; -// continue; - } - if (rc > 0) - { - break; - } - if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) - { - SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll"); - continue; - } - SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno()); - } - - for (int i = 0; true; ++i) - { - rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), ZMQ_DONTWAIT); - if (rc < 0) - { - if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) - { - SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive"); - continue; - } - SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno()); - return false; - } - if (rc >= (int)m_sendbuffer.size()) - { - SWSS_LOG_ERROR( - "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - (int)m_sendbuffer.size(), rc); -// return false; - } - break; - } - m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse kcos.clear(); - BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos); - return true; + return false; } - } diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index 8904f92b..07346c08 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -166,15 +166,11 @@ SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::send"); } bool ZmqProducerStateTable::wait(std::string& dbName, - std::string& tableName, - std::vector>& kcos) - { SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::wait"); return m_zmqClient.wait(dbName, tableName, kcos); - } size_t ZmqProducerStateTable::dbUpdaterQueueSize() diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index d46a85c4..4a745f65 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -18,7 +18,6 @@ ZmqServer::ZmqServer(const std::string& endpoint) } ZmqServer::ZmqServer(const std::string& endpoint, bool zmr_test) -// : ZmqServer(endpoint, "") : m_endpoint(endpoint), m_allowZmqPoll(true) { @@ -139,35 +138,6 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("mqPollThread begin"); -/* // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket - void* context = zmq_ctx_new(); - void* socket = zmq_socket(context, ZMQ_PULL); - -//divya - - int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); - SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code); -//divya - - // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt - int high_watermark = MQ_WATERMARK; - zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); - - if (!m_vrf.empty()) - { - zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); - } - - int rc = zmq_bind(socket, m_endpoint.c_str()); - SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc); - if (rc != 0) - { - SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", - m_endpoint.c_str(), - zmq_errno(), - strerror(zmq_errno())); - }*/ - // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; @@ -224,9 +194,6 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); handleReceivedData(m_buffer.data(), rc); } -// zmq_close(socket); -// zmq_ctx_destroy(context); - SWSS_LOG_NOTICE("mqPollThread end"); } @@ -293,8 +260,7 @@ SWSS_LOG_ERROR("DIV:: Inside function server sendMsg"); auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); SWSS_LOG_ERROR("%s", message.c_str()); SWSS_LOG_DEBUG("3. m_socket in server sendmsg() is: %p\n", m_socket); -// throw system_error(make_error_code(errc::io_error), message); -// SWSS_LOG_THROW("Else case message is: %s", message.c_str()); + throw system_error(make_error_code(errc::io_error), message); return; } usleep(retry_delay * 1000); @@ -303,9 +269,7 @@ SWSS_LOG_ERROR("DIV:: Inside function server sendMsg"); // failed after retry auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); SWSS_LOG_ERROR("%s", message.c_str()); -// throw system_error(make_error_code(errc::io_error), message); -// SWSS_LOG_THROW("Last Error message is %s", message.c_str()); -// return; + throw system_error(make_error_code(errc::io_error), message); } } diff --git a/common/zmqserver.h b/common/zmqserver.h index 169a8c48..e24dcc01 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -61,16 +61,14 @@ class ZmqServer std::string m_endpoint; - void* m_context; - std::string m_vrf; + void* m_context; + void* m_socket; bool m_allowZmqPoll; -// std::vector m_sendbuffer; - std::map> m_HandlerMap; }; diff --git a/core b/core deleted file mode 100644 index 6600072f..00000000 Binary files a/core and /dev/null differ diff --git a/tests/c_api_ut.cpp b/tests/c_api_ut.cpp index ed814607..d9064813 100644 --- a/tests/c_api_ut.cpp +++ b/tests/c_api_ut.cpp @@ -221,9 +221,7 @@ TEST(c_api, SubscriberStateTable) { TEST(c_api, ZmqConsumerProducerStateTable) { clearDB(); SWSSStringManager sm; - SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); - SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312"); SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312"); EXPECT_TRUE(SWSSZmqClient_isConnected(cli)); @@ -241,9 +239,11 @@ TEST(c_api, ZmqConsumerProducerStateTable) { ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); +SWSS_LOG_DEBUG("print7"); // On flag = 0, we use the ZmqProducerStateTable // On flag = 1, we use the ZmqClient directly for (int flag = 0; flag < 2; flag++) { +SWSS_LOG_DEBUG("print7 for loop, flag set is : %d", flag); SWSSFieldValueTuple values_key1_data[2] = {{.field = "myfield1", .value = sm.makeString("myvalue1")}, {.field = "myfield2", .value = sm.makeString("myvalue2")}}; SWSSFieldValueArray values_key1 = { @@ -251,30 +251,39 @@ TEST(c_api, ZmqConsumerProducerStateTable) { .data = values_key1_data, }; +SWSS_LOG_DEBUG("print8"); SWSSFieldValueTuple values_key2_data[1] = {{.field = "myfield3", .value = sm.makeString("myvalue3")}}; SWSSFieldValueArray values_key2 = { .len = 1, .data = values_key2_data, }; +SWSS_LOG_DEBUG("print9"); SWSSKeyOpFieldValues arr_data[2] = { {.key = "mykey1", .operation = SWSSKeyOperation_SET, .fieldValues = values_key1}, {.key = "mykey2", .operation = SWSSKeyOperation_SET, .fieldValues = values_key2}}; arr = {.len = 2, .data = arr_data}; +SWSS_LOG_DEBUG("print10"); if (flag == 0) for (uint64_t i = 0; i < arr.len; i++) SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues); else + { + SWSS_LOG_DEBUG("print10 else loop, flag is: %d", flag); SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); + } +SWSS_LOG_DEBUG("print11"); ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA); arr = SWSSZmqConsumerStateTable_pops(cst); +SWSS_LOG_DEBUG("print12"); vector kfvs = takeKeyOpFieldValuesArray(arr); sortKfvs(kfvs); freeKeyOpFieldValuesArray(arr); +SWSS_LOG_DEBUG("print13"); ASSERT_EQ(kfvs.size(), 2); EXPECT_EQ(kfvKey(kfvs[0]), "mykey1"); EXPECT_EQ(kfvOp(kfvs[0]), "SET"); @@ -285,6 +294,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) { EXPECT_EQ(fieldValues0[1].first, "myfield2"); EXPECT_EQ(fieldValues0[1].second, "myvalue2"); +SWSS_LOG_DEBUG("print14"); EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); EXPECT_EQ(kfvOp(kfvs[1]), "SET"); vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); @@ -306,13 +316,16 @@ TEST(c_api, ZmqConsumerProducerStateTable) { else SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); +SWSS_LOG_DEBUG("print15"); ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA); arr = SWSSZmqConsumerStateTable_pops(cst); +SWSS_LOG_DEBUG("print16"); kfvs = takeKeyOpFieldValuesArray(arr); sortKfvs(kfvs); freeKeyOpFieldValuesArray(arr); +SWSS_LOG_DEBUG("print17"); ASSERT_EQ(kfvs.size(), 2); EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 3af09364..eb1e2b21 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "gtest/gtest.h" #include "common/dbconnector.h" #include "common/notificationconsumer.h" @@ -14,6 +15,7 @@ #include "common/zmqclient.h" #include "common/zmqproducerstatetable.h" #include "common/zmqconsumerstatetable.h" +#include "common/binaryserializer.h" using namespace std; using namespace swss; @@ -483,6 +485,10 @@ static bool zmq_done = false; static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence) { +// std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + cout << "DIV:: Function zmqConsumerWorker 473" << endl; cout << "Consumer thread started: " << tableName << endl; DBConnector db(TEST_DB, 0, true); @@ -491,30 +497,28 @@ static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersiste cout << "DIV:: Function zmqConsumerWorker 478" << endl; ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); cout << "DIV:: Function zmqConsumerWorker 480" << endl; - Select cs; - cs.addSelectable(&c); //validate received data - Selectable *selectcs; - std::deque vkco; - cout << "DIV:: Function zmqConsumerWorker 486" << endl; - int ret = 0; + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + while (!zmq_done) { - cout << "DIV:: Function zmqConsumerWorker 490" << endl; - ret = cs.select(&selectcs, 10, true); - cout << "DIV:: Function zmqConsumerWorker 492" << endl; - if (ret == Select::OBJECT) - { - cout << "DIV:: Function zmqConsumerWorker 494" << endl; - c.pops(vkco); - cout << "DIV:: Function zmqConsumerWorker 496" << endl; - std::vector values; - cout << "DIV:: Function zmqConsumerWorker 498" << endl; - values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); - cout << "DIV:: Function zmqConsumerWorker 500" << endl; - server.sendMsg(TEST_DB, tableName, values); - cout << "DIV:: Function zmqConsumerWorker 502" << endl; - } + sleep(10); + std::string rec_dbName, rec_tableName; + std::vector> rec_kcos_ptrs; + std::vector deserialized_kcos; + + BinarySerializer::deserializeBuffer(server.m_buffer.data(), server.m_buffer.size(), rec_dbName, rec_tableName, rec_kcos_ptrs); + + for (auto kco_ptr : rec_kcos_ptrs) + { + deserialized_kcos.push_back(*kco_ptr); + } + SWSS_LOG_DEBUG("dbname is: %s & tablename is : %s", rec_dbName.c_str(), rec_tableName.c_str()); + EXPECT_EQ(rec_dbName, TEST_DB); + EXPECT_EQ(rec_tableName, tableName); + EXPECT_EQ(deserialized_kcos, values); + } allDataReceived = true; @@ -528,13 +532,13 @@ static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersiste } } + zmq_done = true; cout << "Consumer thread ended: " << tableName << endl; } static void ZmqWithResponse(bool producerPersistence) { std::string testTableName = "ZMQ_PROD_CONS_UT"; -// std::string db_Name = "TEST_DB"; std::string pushEndpoint = "tcp://localhost:1234"; std::string pullEndpoint = "tcp://*:1234"; // start consumer first, SHM can only have 1 consumer per table. @@ -554,23 +558,15 @@ static void ZmqWithResponse(bool producerPersistence) std::vector> kcos_p; cout << "DIV:: Function ZmqWithResponse ut 1 541" << endl; std::string dbName, tableName; - for (int i = 0; i < 3; ++i) - { + //for (int i = 0; i < 5; ++i) + // { cout << "DIV:: Function ZmqWithResponse ut 1 545" << endl; p.send(kcos); - ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); - cout << "DIV:: Function ZmqWithResponse ut 1 548" << endl; - EXPECT_EQ(dbName, TEST_DB); - EXPECT_EQ(tableName, testTableName); - ASSERT_EQ(kcos_p.size(), 1); - EXPECT_EQ(kfvKey(*kcos_p[0]), "k"); - EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND); - std::vector cos = std::vector{FieldValueTuple{"f", "v"}}; - EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos); - } + //} cout << "DIV:: Function ZmqWithResponse ut 1 558" << endl; zmq_done = true; + sleep(10); consumerThread->join(); delete consumerThread; } @@ -578,31 +574,21 @@ static void ZmqWithResponse(bool producerPersistence) TEST(ZmqWithResponse, test) { // test with persist by consumer - ZmqWithResponse(false); + ZmqWithResponse(true); } TEST(ZmqWithResponseClientError, test) { std::string testTableName = "ZMQ_PROD_CONS_UT"; std::string pushEndpoint = "tcp://localhost:1234"; -// std::string new_dbName = "TEST_DB"; - cout << "DIV:: Function ZmqWithResponse ut 2 575" << endl; DBConnector db(TEST_DB, 0, true); - cout << "DIV:: Function ZmqWithResponse ut 2 577" << endl; ZmqClient client(pushEndpoint, 3000); -// ZmqClient client(pushEndpoint); - cout << "DIV:: Function ZmqWithResponse ut 2 580" << endl; ZmqProducerStateTable p(&db, testTableName, client, true); - cout << "DIV:: Function ZmqWithResponse ut 2 582" << endl; std::vector kcos; kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); std::vector> kcos_p; std::string dbName, tableName; - cout << "DIV:: Function ZmqWithResponse ut 2 587" << endl; p.send(kcos); // Wait will timeout without server reply. EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); - cout << "DIV:: Function ZmqWithResponse ut 2 591" << endl; -// EXPECT_FALSE(p.wait(new_dbName, testTableName, kcos_p)); } -