-
Notifications
You must be signed in to change notification settings - Fork 273
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support async DB update for both ZMQ producer&consumer table. (#821)
* Suppory async DB update for both ZMQ producer/consumer table. * Fix UT * Improve UT and code * Improve code and UT * Fix PR comments * Fix PR comments
- Loading branch information
Showing
8 changed files
with
327 additions
and
132 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#include <string> | ||
#include <deque> | ||
#include <limits> | ||
#include <hiredis/hiredis.h> | ||
#include <pthread.h> | ||
#include "asyncdbupdater.h" | ||
#include "dbconnector.h" | ||
#include "redisselect.h" | ||
#include "redisapi.h" | ||
#include "table.h" | ||
|
||
using namespace std; | ||
|
||
namespace swss { | ||
|
||
AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName) | ||
: m_db(db) | ||
, m_tableName(tableName) | ||
{ | ||
m_runThread = true; | ||
m_dbUpdateThread = std::make_shared<std::thread>(&AsyncDBUpdater::dbUpdateThread, this); | ||
|
||
SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str()); | ||
} | ||
|
||
AsyncDBUpdater::~AsyncDBUpdater() | ||
{ | ||
m_runThread = false; | ||
|
||
// notify db update thread exit | ||
m_dbUpdateDataNotifyCv.notify_all(); | ||
m_dbUpdateThread->join(); | ||
} | ||
|
||
void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco) | ||
{ | ||
{ | ||
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex); | ||
m_dbUpdateDataQueue.push(pkco); | ||
} | ||
|
||
m_dbUpdateDataNotifyCv.notify_all(); | ||
} | ||
|
||
void AsyncDBUpdater::dbUpdateThread() | ||
{ | ||
SWSS_LOG_ENTER(); | ||
SWSS_LOG_NOTICE("dbUpdateThread begin"); | ||
|
||
// Different schedule policy has different min priority | ||
pthread_attr_t attr; | ||
int policy; | ||
pthread_attr_getschedpolicy(&attr, &policy); | ||
int min_priority = sched_get_priority_min(policy); | ||
// Use min priority will block poll thread | ||
pthread_setschedprio(pthread_self(), min_priority + 1); | ||
|
||
// Follow same logic in ConsumerStateTable: every received data will write to 'table'. | ||
DBConnector db(m_db->getDbName(), 0, true); | ||
Table table(&db, m_tableName); | ||
std::mutex cvMutex; | ||
std::unique_lock<std::mutex> cvLock(cvMutex); | ||
|
||
while (m_runThread) | ||
{ | ||
size_t count; | ||
count = queueSize(); | ||
if (count == 0) | ||
{ | ||
// when queue is empty, wait notification, when data come, continue to check queue size again | ||
m_dbUpdateDataNotifyCv.wait(cvLock); | ||
continue; | ||
} | ||
|
||
for (size_t ie = 0; ie < count; ie++) | ||
{ | ||
auto& kco = *(m_dbUpdateDataQueue.front()); | ||
|
||
if (kfvOp(kco) == SET_COMMAND) | ||
{ | ||
auto& values = kfvFieldsValues(kco); | ||
|
||
// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry. | ||
table.del(kfvKey(kco)); | ||
table.set(kfvKey(kco), values); | ||
} | ||
else if (kfvOp(kco) == DEL_COMMAND) | ||
{ | ||
table.del(kfvKey(kco)); | ||
} | ||
else | ||
{ | ||
SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str()); | ||
} | ||
|
||
{ | ||
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex); | ||
m_dbUpdateDataQueue.pop(); | ||
} | ||
} | ||
} | ||
|
||
SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str()); | ||
} | ||
|
||
size_t AsyncDBUpdater::queueSize() | ||
{ | ||
// size() is not thread safe | ||
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex); | ||
|
||
return m_dbUpdateDataQueue.size(); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
#pragma once | ||
|
||
#include <string> | ||
#include <deque> | ||
#include <condition_variable> | ||
#include "dbconnector.h" | ||
#include "table.h" | ||
|
||
#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) | ||
#define MQ_SIZE 100 | ||
#define MQ_MAX_RETRY 10 | ||
#define MQ_POLL_TIMEOUT (1000) | ||
|
||
namespace swss { | ||
|
||
class AsyncDBUpdater | ||
{ | ||
public: | ||
AsyncDBUpdater(DBConnector *db, const std::string &tableName); | ||
~AsyncDBUpdater(); | ||
|
||
void update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco); | ||
|
||
size_t queueSize(); | ||
private: | ||
void dbUpdateThread(); | ||
|
||
volatile bool m_runThread; | ||
|
||
std::shared_ptr<std::thread> m_dbUpdateThread; | ||
|
||
std::mutex m_dbUpdateDataQueueMutex; | ||
|
||
std::condition_variable m_dbUpdateDataNotifyCv; | ||
|
||
std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_dbUpdateDataQueue; | ||
|
||
DBConnector *m_db; | ||
|
||
std::string m_tableName; | ||
}; | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.