Skip to content

Commit

Permalink
Recreate notification consumer queue when empty
Browse files Browse the repository at this point in the history
Signed-off-by: Lawrence Lee <lawlee@microsoft.com>
  • Loading branch information
theasianpianist committed Dec 11, 2024
1 parent 7425c42 commit 6004ed9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
34 changes: 24 additions & 10 deletions common/notificationconsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const st
{
SWSS_LOG_ENTER();

m_queue = std::make_shared<std::queue<std::string>>();
while (true)
{
try
Expand Down Expand Up @@ -105,12 +106,12 @@ uint64_t swss::NotificationConsumer::readData()

bool swss::NotificationConsumer::hasData()
{
return m_queue.size() > 0;
return m_queue->size() > 0;
}

bool swss::NotificationConsumer::hasCachedData()
{
return m_queue.size() > 1;
return m_queue->size() > 1;
}

void swss::NotificationConsumer::processReply(redisReply *reply)
Expand Down Expand Up @@ -138,21 +139,34 @@ void swss::NotificationConsumer::processReply(redisReply *reply)

SWSS_LOG_DEBUG("got message: %s", msg.c_str());

m_queue.push(msg);
m_queue->push(msg);
}

void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector<FieldValueTuple> &values)
{
SWSS_LOG_ENTER();

if (m_queue.empty())
if (m_queue->empty())
{
SWSS_LOG_ERROR("notification queue is empty, can't pop");
throw std::runtime_error("notification queue is empty, can't pop");
}

std::string msg = m_queue.front();
m_queue.pop();
std::string msg = m_queue->front();
m_queue->pop();

if (m_queue->empty())
{
/***
* If there is a burst of notifications that causes the queue to grow in size,
* memory allocated by the queue will not be released even after the all items
* have been popped.
*
* Force the memory to be released by destroying existing queue and creating a new one.
*/
m_queue = nullptr;
m_queue = std::make_shared<std::queue<std::string>>();
}

values.clear();
JSon::readJson(msg, values);
Expand All @@ -170,9 +184,9 @@ void swss::NotificationConsumer::pops(std::deque<KeyOpFieldsValuesTuple> &vkco)
SWSS_LOG_ENTER();

vkco.clear();
while(!m_queue.empty())
while(!m_queue->empty())
{
while(!m_queue.empty())
while(!m_queue->empty())
{
std::string op;
std::string data;
Expand All @@ -198,7 +212,7 @@ void swss::NotificationConsumer::pops(std::deque<KeyOpFieldsValuesTuple> &vkco)
int swss::NotificationConsumer::peek()
{
SWSS_LOG_ENTER();
if (m_queue.empty())
if (m_queue->empty())
{
// Peek for more data in redis socket
int rc = swss::peekRedisContext(m_subscribe->getContext());
Expand All @@ -209,5 +223,5 @@ int swss::NotificationConsumer::peek()
readData();
}

return m_queue.empty() ? 0 : 1;
return m_queue->empty() ? 0 : 1;
}
2 changes: 1 addition & 1 deletion common/notificationconsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class NotificationConsumer : public Selectable
swss::DBConnector *m_db;
swss::DBConnector *m_subscribe;
std::string m_channel;
std::queue<std::string> m_queue;
std::shared_ptr<std::queue<std::string>> m_queue;
};

}
Expand Down

0 comments on commit 6004ed9

Please sign in to comment.