Skip to content

Commit

Permalink
Suppory async DB update for both ZMQ producer/consumer table.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuh-80 committed Sep 27, 2023
1 parent b0f148e commit 47c1625
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 126 deletions.
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ common_libswsscommon_la_SOURCES = \
common/profileprovider.cpp \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
Expand Down
111 changes: 111 additions & 0 deletions common/asyncdbupdater.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include <pthread.h>
#include "asyncdbupdater.h"
#include "dbconnector.h"
#include "redisselect.h"
#include "redisapi.h"
#include "table.h"

using namespace std;

namespace swss {

AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName)
: m_db(db)
, m_tableName(tableName)
{
m_runThread = true;
m_dbUpdateThread = std::make_shared<std::thread>(&AsyncDBUpdater::dbUpdateThread, this);

SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str());
}

AsyncDBUpdater::~AsyncDBUpdater()
{
m_runThread = false;

// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
}

void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(pkco);
}

m_dbUpdateDataNotifyCv.notify_all();
}

void AsyncDBUpdater::dbUpdateThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("dbUpdateThread begin");

// Different schedule policy has different min priority
pthread_attr_t attr;
int policy;
pthread_attr_getschedpolicy(&attr, &policy);
int min_priority = sched_get_priority_min(policy);
// Use min priority will block poll thread
pthread_setschedprio(pthread_self(), min_priority + 1);

// Follow same logic in ConsumerStateTable: every received data will write to 'table'.
DBConnector db(m_db->getDbName(), 0, true);
Table table(&db, m_tableName);
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
{
m_dbUpdateDataNotifyCv.wait(cvLock);

size_t count;
{
// size() is not thread safe
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);

// For new data append to m_dataQueue during pops, will not be include in result.
count = m_dbUpdateDataQueue.size();
if (!count)
{
continue;
}

}

for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_dbUpdateDataQueue.front());

if (kfvOp(kco) == SET_COMMAND)
{
auto& values = kfvFieldsValues(kco);

// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry.
table.del(kfvKey(kco));
table.set(kfvKey(kco), values);
}
else if (kfvOp(kco) == DEL_COMMAND)
{
table.del(kfvKey(kco));
}
else
{
SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str());
}

{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.pop();
}
}
}
}

}
42 changes: 42 additions & 0 deletions common/asyncdbupdater.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <string>
#include <deque>
#include <condition_variable>
#include "dbconnector.h"
#include "table.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)

namespace swss {

class AsyncDBUpdater
{
public:
AsyncDBUpdater(DBConnector *db, const std::string &tableName);
~AsyncDBUpdater();

void update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);

private:
void dbUpdateThread();

volatile bool m_runThread;

std::shared_ptr<std::thread> m_dbUpdateThread;

std::mutex m_dbUpdateDataQueueMutex;

std::condition_variable m_dbUpdateDataNotifyCv;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_dbUpdateDataQueue;

DBConnector *m_db;

std::string m_tableName;
};

}
94 changes: 5 additions & 89 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,23 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string
if (dbPersistence)
{
SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str());
m_runThread = true;
m_dbUpdateThread = std::make_shared<std::thread>(&ZmqConsumerStateTable::dbUpdateThread, this);
m_asyncDBUpdater = std::make_unique<AsyncDBUpdater>(db, tableName);
}
else
{
SWSS_LOG_DEBUG("Database persistence disabled, tableName: %s", tableName.c_str());
m_dbUpdateThread = nullptr;
m_asyncDBUpdater = nullptr;
}

m_zmqServer.registerMessageHandler(m_db->getDbName(), tableName, this);

SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str());
}

ZmqConsumerStateTable::~ZmqConsumerStateTable()
{
if (m_dbUpdateThread != nullptr)
{
m_runThread = false;

// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
}
}

void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
{
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_dbUpdateThread != nullptr)
if (m_asyncDBUpdater != nullptr)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*pkco);
Expand All @@ -68,80 +55,9 @@ void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr<KeyOpFieldsValues

m_selectableEvent.notify(); // will release epoll

if (m_dbUpdateThread != nullptr)
if (m_asyncDBUpdater != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(clone);
}

m_dbUpdateDataNotifyCv.notify_all();
}
}

void ZmqConsumerStateTable::dbUpdateThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("dbUpdateThread begin");

// Different schedule policy has different min priority
pthread_attr_t attr;
int policy;
pthread_attr_getschedpolicy(&attr, &policy);
int min_priority = sched_get_priority_min(policy);
// Use min priority will block poll thread
pthread_setschedprio(pthread_self(), min_priority + 1);

// Follow same logic in ConsumerStateTable: every received data will write to 'table'.
DBConnector db(m_db->getDbName(), 0, true);
Table table(&db, getTableName());
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
{
m_dbUpdateDataNotifyCv.wait(cvLock);

size_t count;
{
// size() is not thread safe
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);

// For new data append to m_dataQueue during pops, will not be include in result.
count = m_dbUpdateDataQueue.size();
if (!count)
{
continue;
}

}

for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_dbUpdateDataQueue.front());

if (kfvOp(kco) == SET_COMMAND)
{
auto& values = kfvFieldsValues(kco);

// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry.
table.del(kfvKey(kco));
table.set(kfvKey(kco), values);
}
else if (kfvOp(kco) == DEL_COMMAND)
{
table.del(kfvKey(kco));
}
else
{
SWSS_LOG_ERROR("zmq consumer table: %s, receive unknown operation: %s", getTableName().c_str(), kfvOp(kco).c_str());
}

{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.pop();
}
}
m_asyncDBUpdater->update(clone);
}
}

Expand Down
22 changes: 6 additions & 16 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
#include <string>
#include <deque>
#include <condition_variable>
#include "dbconnector.h"
#include "table.h"
#include "asyncdbupdater.h"
#include "consumertablebase.h"
#include "dbconnector.h"
#include "selectableevent.h"
#include "table.h"
#include "zmqserver.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
Expand All @@ -22,8 +23,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
/* The default value of pop batch size is 128 */
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = true);
~ZmqConsumerStateTable();
ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = false);

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
Expand Down Expand Up @@ -75,27 +75,17 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
private:
void handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);

void dbUpdateThread();

volatile bool m_runThread;

std::mutex m_receivedQueueMutex;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_receivedOperationQueue;

swss::SelectableEvent m_selectableEvent;

std::shared_ptr<std::thread> m_dbUpdateThread;

std::mutex m_dbUpdateDataQueueMutex;

std::condition_variable m_dbUpdateDataNotifyCv;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_dbUpdateDataQueue;

DBConnector *m_db;

ZmqServer& m_zmqServer;

std::unique_ptr<AsyncDBUpdater> m_asyncDBUpdater;
};

}
Loading

0 comments on commit 47c1625

Please sign in to comment.