Skip to content

Commit

Permalink
ut pass case
Browse files Browse the repository at this point in the history
  • Loading branch information
divyagayathri-hcl committed Dec 16, 2024
1 parent 16d70bc commit dbb078a
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 152 deletions.
1 change: 1 addition & 0 deletions common/c-api/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ void SWSSZmqClient_connect(SWSSZmqClient zmqc) {

void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName,
SWSSKeyOpFieldValuesArray arr) {
SWSS_LOG_DEBUG("Inside SWSSZmqClient_sendMsg");
SWSSTry({
vector<KeyOpFieldsValuesTuple> kcos = takeKeyOpFieldValuesArray(arr);
((ZmqClient *)zmqc)
Expand Down
66 changes: 2 additions & 64 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace swss {
ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
{
// initialize(endpoint);
initialize(endpoint);
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
Expand All @@ -29,7 +29,6 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) :
m_waitTimeMs(waitTimeMs)
{
// m_waitTimeMs = waitTimeMs;
initialize(endpoint);
}

Expand Down Expand Up @@ -226,68 +225,7 @@ bool ZmqClient::wait(std::string& dbName,
SWSS_LOG_ERROR("DIV:: Inside function wait");
SWSS_LOG_ENTER();

// return false;

zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;

/* zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;*/

int rc;
for (int i = 0; true; ++i)
{
// rc = zmq_poll(&poll_item, 1, 1000);
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
SWSS_LOG_DEBUG("cli: rc value is : %d", rc);
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait");
return false;
// continue;
}
if (rc > 0)
{
break;
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll");
continue;
}
SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno());
}

for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), ZMQ_DONTWAIT);
if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive");
continue;
}
SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno());
return false;
}
if (rc >= (int)m_sendbuffer.size())
{
SWSS_LOG_ERROR(
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
(int)m_sendbuffer.size(), rc);
// return false;
}
break;
}
m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse
kcos.clear();
BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos);
return true;
return false;
}

}
40 changes: 2 additions & 38 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ZmqServer::ZmqServer(const std::string& endpoint)
}

ZmqServer::ZmqServer(const std::string& endpoint, bool zmr_test)
// : ZmqServer(endpoint, "")
: m_endpoint(endpoint),
m_allowZmqPoll(true)
{
Expand Down Expand Up @@ -139,35 +138,6 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread");
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");

/* // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();
void* socket = zmq_socket(context, ZMQ_PULL);
//divya
int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code);
//divya
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}
int rc = zmq_bind(socket, m_endpoint.c_str());
SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc);
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s",
m_endpoint.c_str(),
zmq_errno(),
strerror(zmq_errno()));
}*/

// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
Expand Down Expand Up @@ -224,9 +194,6 @@ SWSS_LOG_ERROR("DIV:: Inside function mqPollThread");
handleReceivedData(m_buffer.data(), rc);
}

// zmq_close(socket);
// zmq_ctx_destroy(context);

SWSS_LOG_NOTICE("mqPollThread end");
}

Expand Down Expand Up @@ -293,8 +260,7 @@ SWSS_LOG_ERROR("DIV:: Inside function server sendMsg");
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
SWSS_LOG_DEBUG("3. m_socket in server sendmsg() is: %p\n", m_socket);
// throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Else case message is: %s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
return;
}
usleep(retry_delay * 1000);
Expand All @@ -303,9 +269,7 @@ SWSS_LOG_ERROR("DIV:: Inside function server sendMsg");
// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
// throw system_error(make_error_code(errc::io_error), message);
// SWSS_LOG_THROW("Last Error message is %s", message.c_str());
// return;
throw system_error(make_error_code(errc::io_error), message);
}

}
6 changes: 2 additions & 4 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,14 @@ class ZmqServer

std::string m_endpoint;

void* m_context;

std::string m_vrf;

void* m_context;

void* m_socket;

bool m_allowZmqPoll;

// std::vector<char> m_sendbuffer;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
Binary file removed core
Binary file not shown.
17 changes: 15 additions & 2 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ TEST(c_api, SubscriberStateTable) {
TEST(c_api, ZmqConsumerProducerStateTable) {
clearDB();
SWSSStringManager sm;

SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true);

SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312");
SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312");
EXPECT_TRUE(SWSSZmqClient_isConnected(cli));
Expand All @@ -241,40 +239,51 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
ASSERT_EQ(arr.len, 0);
freeKeyOpFieldValuesArray(arr);

SWSS_LOG_DEBUG("print7");
// On flag = 0, we use the ZmqProducerStateTable
// On flag = 1, we use the ZmqClient directly
for (int flag = 0; flag < 2; flag++) {
SWSS_LOG_DEBUG("print7 for loop, flag set is : %d", flag);
SWSSFieldValueTuple values_key1_data[2] = {{.field = "myfield1", .value = sm.makeString("myvalue1")},
{.field = "myfield2", .value = sm.makeString("myvalue2")}};
SWSSFieldValueArray values_key1 = {
.len = 2,
.data = values_key1_data,
};

SWSS_LOG_DEBUG("print8");
SWSSFieldValueTuple values_key2_data[1] = {{.field = "myfield3", .value = sm.makeString("myvalue3")}};
SWSSFieldValueArray values_key2 = {
.len = 1,
.data = values_key2_data,
};

SWSS_LOG_DEBUG("print9");
SWSSKeyOpFieldValues arr_data[2] = {
{.key = "mykey1", .operation = SWSSKeyOperation_SET, .fieldValues = values_key1},
{.key = "mykey2", .operation = SWSSKeyOperation_SET, .fieldValues = values_key2}};
arr = {.len = 2, .data = arr_data};

SWSS_LOG_DEBUG("print10");
if (flag == 0)
for (uint64_t i = 0; i < arr.len; i++)
SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues);
else
{
SWSS_LOG_DEBUG("print10 else loop, flag is: %d", flag);
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);
}

SWSS_LOG_DEBUG("print11");
ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA);
arr = SWSSZmqConsumerStateTable_pops(cst);

SWSS_LOG_DEBUG("print12");
vector<KeyOpFieldsValuesTuple> kfvs = takeKeyOpFieldValuesArray(arr);
sortKfvs(kfvs);
freeKeyOpFieldValuesArray(arr);

SWSS_LOG_DEBUG("print13");
ASSERT_EQ(kfvs.size(), 2);
EXPECT_EQ(kfvKey(kfvs[0]), "mykey1");
EXPECT_EQ(kfvOp(kfvs[0]), "SET");
Expand All @@ -285,6 +294,7 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
EXPECT_EQ(fieldValues0[1].first, "myfield2");
EXPECT_EQ(fieldValues0[1].second, "myvalue2");

SWSS_LOG_DEBUG("print14");
EXPECT_EQ(kfvKey(kfvs[1]), "mykey2");
EXPECT_EQ(kfvOp(kfvs[1]), "SET");
vector<pair<string, string>> &fieldValues1 = kfvFieldsValues(kfvs[1]);
Expand All @@ -306,13 +316,16 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
else
SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr);

SWSS_LOG_DEBUG("print15");
ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA);
arr = SWSSZmqConsumerStateTable_pops(cst);

SWSS_LOG_DEBUG("print16");
kfvs = takeKeyOpFieldValuesArray(arr);
sortKfvs(kfvs);
freeKeyOpFieldValuesArray(arr);

SWSS_LOG_DEBUG("print17");
ASSERT_EQ(kfvs.size(), 2);
EXPECT_EQ(kfvKey(kfvs[0]), "mykey3");
EXPECT_EQ(kfvOp(kfvs[0]), "DEL");
Expand Down
Loading

0 comments on commit dbb078a

Please sign in to comment.