diff --git a/common/Makefile.am b/common/Makefile.am index df41c3be..724805e6 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -69,6 +69,7 @@ common_libswsscommon_la_SOURCES = \ common/zmqserver.cpp \ common/asyncdbupdater.cpp \ common/redis_table_waiter.cpp \ + common/interface.h \ common/c-api/util.cpp \ common/c-api/dbconnector.cpp \ common/c-api/consumerstatetable.cpp \ diff --git a/common/asyncdbupdater.cpp b/common/asyncdbupdater.cpp index 4cf150d9..cf3d74c5 100644 --- a/common/asyncdbupdater.cpp +++ b/common/asyncdbupdater.cpp @@ -30,6 +30,7 @@ AsyncDBUpdater::~AsyncDBUpdater() // notify db update thread exit m_dbUpdateDataNotifyCv.notify_all(); m_dbUpdateThread->join(); + SWSS_LOG_DEBUG("AsyncDBUpdater dtor tableName: %s", m_tableName.c_str()); } void AsyncDBUpdater::update(std::shared_ptr pkco) @@ -61,16 +62,30 @@ void AsyncDBUpdater::dbUpdateThread() std::mutex cvMutex; std::unique_lock cvLock(cvMutex); - while (m_runThread) + while (true) { size_t count; count = queueSize(); if (count == 0) { + // Check if there still data in queue before exit + if (!m_runThread) + { + SWSS_LOG_NOTICE("dbUpdateThread for table: %s is exiting", m_tableName.c_str()); + break; + } + // when queue is empty, wait notification, when data come, continue to check queue size again m_dbUpdateDataNotifyCv.wait(cvLock); continue; } + else + { + if (!m_runThread) + { + SWSS_LOG_DEBUG("dbUpdateThread for table: %s still has %d records that need to be sent before exiting", m_tableName.c_str(), (int)count); + } + } for (size_t ie = 0; ie < count; ie++) { diff --git a/common/c-api/consumerstatetable.cpp b/common/c-api/consumerstatetable.cpp index c01ed822..9765ceec 100644 --- a/common/c-api/consumerstatetable.cpp +++ b/common/c-api/consumerstatetable.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -10,6 +11,7 @@ using namespace swss; using namespace std; +using boost::numeric_cast; SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName, const int32_t *p_popBatchSize, @@ -32,3 +34,12 @@ SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl return makeKeyOpFieldValuesArray(vkco); }); } + +uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl) { + SWSSTry(return numeric_cast(((ConsumerStateTable *)tbl)->getFd())); +} + +SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + SWSSTry(return selectOne((ConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal)); +} diff --git a/common/c-api/consumerstatetable.h b/common/c-api/consumerstatetable.h index bd2fdaaf..468fb644 100644 --- a/common/c-api/consumerstatetable.h +++ b/common/c-api/consumerstatetable.h @@ -21,6 +21,17 @@ void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl); // Result array and all of its members must be freed using free() SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl); +// Return the underlying fd for polling/selecting on. +// Callers must NOT read/write on fd, it may only be used for epoll or similar. +// After the fd becomes readable, SWSSConsumerStateTable_readData must be used to +// reset the fd and read data into internal data structures. +uint32_t SWSSConsumerStateTable_getFd(SWSSConsumerStateTable tbl); + +// Block until data is available to read or until a timeout elapses. +// A timeout of 0 means the call will return immediately. +SWSSSelectResult SWSSConsumerStateTable_readData(SWSSConsumerStateTable tbl, uint32_t timeout_ms, + uint8_t interrupt_on_signal); + #ifdef __cplusplus } #endif diff --git a/common/c-api/dbconnector.cpp b/common/c-api/dbconnector.cpp index bb32f42a..83f237cc 100644 --- a/common/c-api/dbconnector.cpp +++ b/common/c-api/dbconnector.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "../dbconnector.h" #include "dbconnector.h" @@ -37,14 +38,14 @@ int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) { SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0); } -void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value) { - SWSSTry(((DBConnector *)db)->set(string(key), string(value))); +void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value) { + SWSSTry(((DBConnector *)db)->set(string(key), takeStrRef(value))); } -char *SWSSDBConnector_get(SWSSDBConnector db, const char *key) { +SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key) { SWSSTry({ shared_ptr s = ((DBConnector *)db)->get(string(key)); - return s ? strdup(s->c_str()) : nullptr; + return s ? makeString(move(*s)) : nullptr; }); } @@ -57,21 +58,29 @@ int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *fie } void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, - const char *value) { - SWSSTry(((DBConnector *)db)->hset(string(key), string(field), string(value))); + SWSSStrRef value) { + SWSSTry(((DBConnector *)db)->hset(string(key), string(field), takeStrRef(value))); } -char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) { +SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) { SWSSTry({ shared_ptr s = ((DBConnector *)db)->hget(string(key), string(field)); - return s ? strdup(s->c_str()) : nullptr; + return s ? makeString(move(*s)) : nullptr; }); } SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) { SWSSTry({ - auto map = ((DBConnector *)db)->hgetall(key); - return makeFieldValueArray(map); + auto map = ((DBConnector *)db)->hgetall(string(key)); + + // We can't move keys out of the map, we have to copy them, until C++17 map::extract so we + // copy them here into a vector to avoid needing an overload on makeFieldValueArray + vector> pairs; + pairs.reserve(map.size()); + for (auto &pair : map) + pairs.push_back(make_pair(pair.first, move(pair.second))); + + return makeFieldValueArray(std::move(pairs)); }); } diff --git a/common/c-api/dbconnector.h b/common/c-api/dbconnector.h index 8e6c51e0..fe4acdf4 100644 --- a/common/c-api/dbconnector.h +++ b/common/c-api/dbconnector.h @@ -29,11 +29,11 @@ void SWSSDBConnector_free(SWSSDBConnector db); // Returns 0 when key doesn't exist, 1 when key was deleted int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key); -void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value); +void SWSSDBConnector_set(SWSSDBConnector db, const char *key, SWSSStrRef value); -// Returns NULL if key doesn't exist. -// Result must be freed using free() -char *SWSSDBConnector_get(SWSSDBConnector db, const char *key); +// Returns NULL if key doesn't exist +// Result must be freed using SWSSString_free() +SWSSString SWSSDBConnector_get(SWSSDBConnector db, const char *key); // Returns 0 for false, 1 for true int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key); @@ -41,59 +41,22 @@ int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key); // Returns 0 when key or field doesn't exist, 1 when field was deleted int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field); -void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, - const char *value); +void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, SWSSStrRef value); -// Returns NULL if key or field doesn't exist. -// Result must be freed using free() -char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field); +// Returns NULL if key or field doesn't exist +// Result must be freed using SWSSString_free() +SWSSString SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field); -// Returns an empty map when the key doesn't exist. -// Result array and all of its elements must be freed using free() +// Returns an empty map when the key doesn't exist +// Result array and all of its elements must be freed using appropriate free functions SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key); // Returns 0 when key or field doesn't exist, 1 when field exists int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field); -// std::vector keys(const std::string &key); - -// std::pair> scan(int cursor = 0, const char -// *match = "", uint32_t count = 10); - -// template -// void hmset(const std::string &key, InputIterator start, InputIterator stop); - -// void hmset(const std::unordered_map>>& multiHash); - -// std::shared_ptr get(const std::string &key); - -// std::shared_ptr hget(const std::string &key, const std::string -// &field); - -// int64_t incr(const std::string &key); - -// int64_t decr(const std::string &key); - -// int64_t rpush(const std::string &list, const std::string &item); - -// std::shared_ptr blpop(const std::string &list, int timeout); - -// void subscribe(const std::string &pattern); - -// void psubscribe(const std::string &pattern); - -// void punsubscribe(const std::string &pattern); - -// int64_t publish(const std::string &channel, const std::string &message); - -// void config_set(const std::string &key, const std::string &value); - // Returns 1 on success, 0 on failure int8_t SWSSDBConnector_flushdb(SWSSDBConnector db); -// std::map>> getall(); #ifdef __cplusplus } #endif diff --git a/common/c-api/producerstatetable.cpp b/common/c-api/producerstatetable.cpp index 083536d7..276d7c68 100644 --- a/common/c-api/producerstatetable.cpp +++ b/common/c-api/producerstatetable.cpp @@ -25,7 +25,7 @@ void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buff void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, SWSSFieldValueArray values) { - SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(values))); + SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(std::move(values)))); } void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key) { diff --git a/common/c-api/producerstatetable.h b/common/c-api/producerstatetable.h index e8db2c65..1acb9af3 100644 --- a/common/c-api/producerstatetable.h +++ b/common/c-api/producerstatetable.h @@ -22,11 +22,6 @@ void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, SWS void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key); -// Batched version of set() and del(). -// virtual void set(const std::vector& values); - -// virtual void del(const std::vector& keys); - void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl); int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl); diff --git a/common/c-api/subscriberstatetable.cpp b/common/c-api/subscriberstatetable.cpp index b6482911..4d3a0495 100644 --- a/common/c-api/subscriberstatetable.cpp +++ b/common/c-api/subscriberstatetable.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -11,6 +12,7 @@ using namespace swss; using namespace std; +using boost::numeric_cast; SWSSSubscriberStateTable SWSSSubscriberStateTable_new(SWSSDBConnector db, const char *tableName, const int32_t *p_popBatchSize, @@ -34,19 +36,12 @@ SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable }); } -uint8_t SWSSSubscriberStateTable_hasData(SWSSSubscriberStateTable tbl) { - SWSSTry(return ((SubscriberStateTable *)tbl)->hasData() ? 1 : 0); -} - -uint8_t SWSSSubscriberStateTable_hasCachedData(SWSSSubscriberStateTable tbl) { - SWSSTry(return ((SubscriberStateTable *)tbl)->hasCachedData() ? 1 : 0); -} - -uint8_t SWSSSubscriberStateTable_initializedWithData(SWSSSubscriberStateTable tbl) { - SWSSTry(return ((SubscriberStateTable *)tbl)->initializedWithData() ? 1 : 0); +uint32_t SWSSSubscriberStateTable_getFd(SWSSSubscriberStateTable tbl) { + SWSSTry(return numeric_cast(((SubscriberStateTable *)tbl)->getFd())); } SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl, - uint32_t timeout_ms) { - SWSSTry(return selectOne((SubscriberStateTable *)tbl, timeout_ms)); + uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + SWSSTry(return selectOne((SubscriberStateTable *)tbl, timeout_ms, interrupt_on_signal)); } diff --git a/common/c-api/subscriberstatetable.h b/common/c-api/subscriberstatetable.h index 4501a3af..ed0924c8 100644 --- a/common/c-api/subscriberstatetable.h +++ b/common/c-api/subscriberstatetable.h @@ -22,19 +22,17 @@ void SWSSSubscriberStateTable_free(SWSSSubscriberStateTable tbl); // Result array and all of its members must be freed using free() SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable tbl); -// Returns 0 for false, 1 for true -uint8_t SWSSSubscriberStateTable_hasData(SWSSSubscriberStateTable tbl); - -// Returns 0 for false, 1 for true -uint8_t SWSSSubscriberStateTable_hasCachedData(SWSSSubscriberStateTable tbl); - -// Returns 0 for false, 1 for true -uint8_t SWSSSubscriberStateTable_initializedWithData(SWSSSubscriberStateTable tbl); +// Return the underlying fd for polling/selecting on. +// Callers must NOT read/write on fd, it may only be used for epoll or similar. +// After the fd becomes readable, SWSSSubscriberStateTable_readData must be used to +// reset the fd and read data into internal data structures. +uint32_t SWSSSubscriberStateTable_getFd(SWSSSubscriberStateTable tbl); // Block until data is available to read or until a timeout elapses. // A timeout of 0 means the call will return immediately. SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl, - uint32_t timeout_ms); + uint32_t timeout_ms, + uint8_t interrupt_on_sugnal); #ifdef __cplusplus } diff --git a/common/c-api/util.cpp b/common/c-api/util.cpp index fb983d5c..1dc6cd45 100644 --- a/common/c-api/util.cpp +++ b/common/c-api/util.cpp @@ -1,3 +1,33 @@ #include "util.h" +using namespace swss; + bool swss::cApiTestingDisableAbort = false; + +SWSSString SWSSString_new(const char *data, uint64_t length) { + SWSSTry(return makeString(std::string(data, numeric_cast(length)))); +} + +SWSSString SWSSString_new_c_str(const char *c_str) { + SWSSTry(return makeString(std::string(c_str))); +} + +const char *SWSSStrRef_c_str(SWSSStrRef s) { + SWSSTry(return ((std::string *)s)->c_str()); +} + +uint64_t SWSSStrRef_length(SWSSStrRef s) { + SWSSTry(return ((std::string *)s)->length()); +} + +void SWSSString_free(SWSSString s) { + SWSSTry(delete (std::string *)s); +} + +void SWSSFieldValueArray_free(SWSSFieldValueArray arr) { + SWSSTry(delete[] arr.data); +} + +void SWSSKeyOpFieldValuesArray_free(SWSSKeyOpFieldValuesArray kfvs) { + SWSSTry(delete[] kfvs.data); +} diff --git a/common/c-api/util.h b/common/c-api/util.h index 79eb93cf..06aeac15 100644 --- a/common/c-api/util.h +++ b/common/c-api/util.h @@ -8,48 +8,103 @@ extern "C" { #include +// FFI version of std::string&& +// This can be converted to an SWSSStrRef with a standard cast +typedef struct SWSSStringOpaque *SWSSString; + +// FFI version of std::string& +// This can be converted to an SWSSString with a standard cast +// Functions that take SWSSString will move data out of the underlying string, +// but functions that take SWSSStrRef will only view it. +typedef struct SWSSStrRefOpaque *SWSSStrRef; + +// FFI version of swss::FieldValueTuple typedef struct { const char *field; - const char *value; -} SWSSFieldValuePair; + SWSSString value; +} SWSSFieldValueTuple; +// FFI version of std::vector typedef struct { uint64_t len; - const SWSSFieldValuePair *data; + SWSSFieldValueTuple *data; } SWSSFieldValueArray; +typedef enum { + SWSSKeyOperation_SET, + SWSSKeyOperation_DEL, +} SWSSKeyOperation; + +// FFI version of swss::KeyOpFieldValuesTuple typedef struct { const char *key; - const char *operation; + SWSSKeyOperation operation; SWSSFieldValueArray fieldValues; } SWSSKeyOpFieldValues; +// FFI version of std::vector typedef struct { uint64_t len; - const SWSSKeyOpFieldValues *data; + SWSSKeyOpFieldValues *data; } SWSSKeyOpFieldValuesArray; +// FFI version of swss::Select::{OBJECT, TIMEOUT, SIGNALINT}. +// swss::Select::ERROR is left out because errors are handled separately typedef enum { + // Data is available in the object SWSSSelectResult_DATA = 0, + // Timed out waiting for data SWSSSelectResult_TIMEOUT = 1, + // Waiting was interrupted by a signal SWSSSelectResult_SIGNAL = 2, } SWSSSelectResult; +// data should not include a null terminator +SWSSString SWSSString_new(const char *data, uint64_t length); + +// c_str should include a null terminator +SWSSString SWSSString_new_c_str(const char *c_str); + +// It is safe to pass null to this function (not to any other SWSSString functions). This is +// useful to take SWSSStrings from other SWSS structs - you can replace the strs in the +// structs with null and still safely free the structs. Then, you can call this function with the +// populated SWSSString later. +void SWSSString_free(SWSSString s); + +const char *SWSSStrRef_c_str(SWSSStrRef s); + +// Returns the length of the string, not including the null terminator that is implicitly added by +// SWSSStrRef_c_str. +uint64_t SWSSStrRef_length(SWSSStrRef s); + +// arr.data may be null. This is not recursive - elements must be freed separately (for finer +// grained control of ownership). +void SWSSFieldValueArray_free(SWSSFieldValueArray arr); + +// kfvs.data may be null. This is not recursive - elements must be freed separately (for finer +// grained control of ownership). +void SWSSKeyOpFieldValuesArray_free(SWSSKeyOpFieldValuesArray kfvs); + #ifdef __cplusplus } #endif // Internal utilities (used to help define c-facing functions) #ifdef __cplusplus -#include -#include + +#include +#include #include #include +#include +#include #include #include +#include #include "../logger.h" -#include "../rediscommand.h" +#include "../redisapi.h" +#include "../schema.h" #include "../select.h" using boost::numeric_cast; @@ -62,7 +117,7 @@ extern bool cApiTestingDisableAbort; // undefined behavior. It was also decided that no exceptions in swss-common are recoverable, so // there is no reason to convert exceptions into a returnable type. #define SWSSTry(...) \ - if (cApiTestingDisableAbort) { \ + if (swss::cApiTestingDisableAbort) { \ { __VA_ARGS__; } \ } else { \ try { \ @@ -74,11 +129,12 @@ extern bool cApiTestingDisableAbort; } \ } -static inline SWSSSelectResult selectOne(swss::Selectable *s, uint32_t timeout_ms) { +static inline SWSSSelectResult selectOne(swss::Selectable *s, uint32_t timeout_ms, + uint8_t interrupt_on_signal) { Select select; Selectable *sOut; select.addSelectable(s); - int ret = select.select(&sOut, numeric_cast(timeout_ms)); + int ret = select.select(&sOut, numeric_cast(timeout_ms), interrupt_on_signal); switch (ret) { case Select::OBJECT: return SWSSSelectResult_DATA; @@ -93,22 +149,21 @@ static inline SWSSSelectResult selectOne(swss::Selectable *s, uint32_t timeout_m } } -// malloc() with safe numeric casting of the size parameter -template static inline void *mallocN(N size) { - return malloc(numeric_cast(size)); +static inline SWSSString makeString(std::string &&s) { + std::string *data_s = new std::string(std::move(s)); + return (struct SWSSStringOpaque *)data_s; } // T is anything that has a .size() method and which can be iterated over for pair -// eg unordered_map or vector> -template static inline SWSSFieldValueArray makeFieldValueArray(const T &in) { - SWSSFieldValuePair *data = - (SWSSFieldValuePair *)mallocN(in.size() * sizeof(SWSSFieldValuePair)); +// eg vector> +template static inline SWSSFieldValueArray makeFieldValueArray(T &&in) { + SWSSFieldValueTuple *data = new SWSSFieldValueTuple[in.size()]; size_t i = 0; - for (const auto &pair : in) { - SWSSFieldValuePair entry; + for (auto &pair : in) { + SWSSFieldValueTuple entry; entry.field = strdup(pair.first.c_str()); - entry.value = strdup(pair.second.c_str()); + entry.value = makeString(std::move(pair.second)); data[i++] = entry; } @@ -118,48 +173,40 @@ template static inline SWSSFieldValueArray makeFieldValueArray(const T return out; } -static inline std::vector -takeFieldValueArray(const SWSSFieldValueArray &in) { - std::vector out; - for (uint64_t i = 0; i < in.len; i++) { - auto field = std::string(in.data[i].field); - auto value = std::string(in.data[i].value); - out.push_back(std::make_pair(field, value)); +static inline SWSSKeyOperation makeKeyOperation(std::string &op) { + if (strcmp(op.c_str(), SET_COMMAND) == 0) { + return SWSSKeyOperation_SET; + } else if (strcmp(op.c_str(), DEL_COMMAND) == 0) { + return SWSSKeyOperation_DEL; + } else { + SWSS_LOG_THROW("Invalid key operation %s", op.c_str()); } - return out; } -static inline SWSSKeyOpFieldValues makeKeyOpFieldValues(const swss::KeyOpFieldsValuesTuple &in) { +static inline SWSSKeyOpFieldValues makeKeyOpFieldValues(swss::KeyOpFieldsValuesTuple &&in) { SWSSKeyOpFieldValues out; out.key = strdup(kfvKey(in).c_str()); - out.operation = strdup(kfvOp(in).c_str()); + out.operation = makeKeyOperation(kfvOp(in)); out.fieldValues = makeFieldValueArray(kfvFieldsValues(in)); return out; } -static inline swss::KeyOpFieldsValuesTuple takeKeyOpFieldValues(const SWSSKeyOpFieldValues &in) { - std::string key(in.key), op(in.operation); - auto fieldValues = takeFieldValueArray(in.fieldValues); - return std::make_tuple(key, op, fieldValues); -} - -template static inline const T &getReference(const T &t) { +template static inline T &getReference(T &t) { return t; } -template static inline const T &getReference(const std::shared_ptr &t) { +template static inline T &getReference(std::shared_ptr &t) { return *t; } // T is anything that has a .size() method and which can be iterated over for -// swss::KeyOpFieldValuesTuple -template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(const T &in) { - SWSSKeyOpFieldValues *data = - (SWSSKeyOpFieldValues *)mallocN(in.size() * sizeof(SWSSKeyOpFieldValues)); +// swss::KeyOpFieldValuesTuple, eg vector or deque +template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(T &&in) { + SWSSKeyOpFieldValues *data = new SWSSKeyOpFieldValues[in.size()]; size_t i = 0; - for (const auto &kfv : in) - data[i++] = makeKeyOpFieldValues(getReference(kfv)); + for (auto &kfv : in) + data[i++] = makeKeyOpFieldValues(std::move(getReference(kfv))); SWSSKeyOpFieldValuesArray out; out.len = (uint64_t)in.size(); @@ -167,11 +214,50 @@ template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesA return out; } +static inline std::string takeString(SWSSString s) { + return std::string(std::move(*((std::string *)s))); +} + +static inline std::string &takeStrRef(SWSSStrRef s) { + return *((std::string *)s); +} + +static inline std::vector takeFieldValueArray(SWSSFieldValueArray in) { + std::vector out; + for (uint64_t i = 0; i < in.len; i++) { + const char *field = in.data[i].field; + SWSSString value = in.data[i].value; + auto pair = std::make_pair(std::string(field), takeString(std::move(value))); + out.push_back(pair); + } + return out; +} + +static inline std::string takeKeyOperation(SWSSKeyOperation op) { + switch (op) { + case SWSSKeyOperation_SET: + return SET_COMMAND; + case SWSSKeyOperation_DEL: + return DEL_COMMAND; + default: + SWSS_LOG_THROW("Impossible SWSSKeyOperation"); + } +} + +static inline swss::KeyOpFieldsValuesTuple takeKeyOpFieldValues(SWSSKeyOpFieldValues in) { + std::string key = in.key; + std::string op = takeKeyOperation(in.operation); + auto fieldValues = takeFieldValueArray(in.fieldValues); + return std::make_tuple(key, op, fieldValues); +} + static inline std::vector -takeKeyOpFieldValuesArray(const SWSSKeyOpFieldValuesArray &in) { +takeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray in) { std::vector out; - for (uint64_t i = 0; i < in.len; i++) - out.push_back(takeKeyOpFieldValues(in.data[i])); + for (uint64_t i = 0; i < in.len; i++) { + SWSSKeyOpFieldValues kfv = in.data[i]; + out.push_back(takeKeyOpFieldValues(std::move(kfv))); + } return out; } diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp index 7e4a58f8..49a9e05f 100644 --- a/common/c-api/zmqclient.cpp +++ b/common/c-api/zmqclient.cpp @@ -24,9 +24,9 @@ void SWSSZmqClient_connect(SWSSZmqClient zmqc) { } void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, - const SWSSKeyOpFieldValuesArray *arr) { + SWSSKeyOpFieldValuesArray arr) { SWSSTry({ - vector kcos = takeKeyOpFieldValuesArray(*arr); + vector kcos = takeKeyOpFieldValuesArray(arr); size_t bufSize = BinarySerializer::serializedSize(dbName, tableName, kcos); vector v(bufSize); ((ZmqClient *)zmqc) diff --git a/common/c-api/zmqclient.h b/common/c-api/zmqclient.h index 47cd1efb..da832ab3 100644 --- a/common/c-api/zmqclient.h +++ b/common/c-api/zmqclient.h @@ -21,7 +21,7 @@ int8_t SWSSZmqClient_isConnected(SWSSZmqClient zmqc); void SWSSZmqClient_connect(SWSSZmqClient zmqc); void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, - const SWSSKeyOpFieldValuesArray *kcos); + SWSSKeyOpFieldValuesArray kcos); #ifdef __cplusplus } diff --git a/common/c-api/zmqconsumerstatetable.cpp b/common/c-api/zmqconsumerstatetable.cpp index 38cd87f9..ed416488 100644 --- a/common/c-api/zmqconsumerstatetable.cpp +++ b/common/c-api/zmqconsumerstatetable.cpp @@ -1,11 +1,14 @@ +#include #include "../zmqconsumerstatetable.h" #include "../table.h" #include "util.h" #include "zmqconsumerstatetable.h" #include "zmqserver.h" +#include using namespace swss; using namespace std; +using boost::numeric_cast; // Pass NULL for popBatchSize and/or pri to use the default values SWSSZmqConsumerStateTable SWSSZmqConsumerStateTable_new(SWSSDBConnector db, const char *tableName, @@ -32,24 +35,14 @@ SWSSKeyOpFieldValuesArray SWSSZmqConsumerStateTable_pops(SWSSZmqConsumerStateTab }); } -SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, - uint32_t timeout_ms) { - SWSSTry(return selectOne((ZmqConsumerStateTable *)tbl, timeout_ms)); -} - -// Returns 0 for false, 1 for true -uint8_t SWSSZmqConsumerStateTable_hasData(SWSSZmqConsumerStateTable tbl) { - SWSSTry(return ((ZmqConsumerStateTable *)tbl)->hasData() ? 1 : 0); +uint32_t SWSSZmqConsumerStateTable_getFd(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return numeric_cast(((ZmqConsumerStateTable *)tbl)->getFd())); } -// Returns 0 for false, 1 for true -uint8_t SWSSZmqConsumerStateTable_hasCachedData(SWSSZmqConsumerStateTable tbl) { - SWSSTry(return ((ZmqConsumerStateTable *)tbl)->hasCachedData() ? 1 : 0); -} - -// Returns 0 for false, 1 for true -uint8_t SWSSZmqConsumerStateTable_initializedWithData(SWSSZmqConsumerStateTable tbl) { - SWSSTry(return ((ZmqConsumerStateTable *)tbl)->initializedWithData() ? 1 : 0); +SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, + uint32_t timeout_ms, + uint8_t interrupt_on_signal) { + SWSSTry(return selectOne((ZmqConsumerStateTable *)tbl, timeout_ms, interrupt_on_signal)); } const struct SWSSDBConnectorOpaque * diff --git a/common/c-api/zmqconsumerstatetable.h b/common/c-api/zmqconsumerstatetable.h index 4810c3ef..f5b93425 100644 --- a/common/c-api/zmqconsumerstatetable.h +++ b/common/c-api/zmqconsumerstatetable.h @@ -24,19 +24,17 @@ void SWSSZmqConsumerStateTable_free(SWSSZmqConsumerStateTable tbl); // Result array and all of its members must be freed using free() SWSSKeyOpFieldValuesArray SWSSZmqConsumerStateTable_pops(SWSSZmqConsumerStateTable tbl); +// Return the underlying fd for polling/selecting on. +// Callers must NOT read/write on fd, it may only be used for epoll or similar. +// After the fd becomes readable, SWSSZmqConsumerStateTable_readData must be used to +// reset the fd and read data into internal data structures. +uint32_t SWSSZmqConsumerStateTable_getFd(SWSSZmqConsumerStateTable tbl); + // Block until data is available to read or until a timeout elapses. // A timeout of 0 means the call will return immediately. SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, - uint32_t timeout_ms); - -// Returns 0 for false, 1 for true -uint8_t SWSSZmqConsumerStateTable_hasData(SWSSZmqConsumerStateTable tbl); - -// Returns 0 for false, 1 for true -uint8_t SWSSZmqConsumerStateTable_hasCachedData(SWSSZmqConsumerStateTable tbl); - -// Returns 0 for false, 1 for true -uint8_t SWSSZmqConsumerStateTable_initializedWithData(SWSSZmqConsumerStateTable tbl); + uint32_t timeout_ms, + uint8_t interrupt_on_signal); const struct SWSSDBConnectorOpaque * SWSSZmqConsumerStateTable_getDbConnector(SWSSZmqConsumerStateTable tbl); diff --git a/common/c-api/zmqproducerstatetable.cpp b/common/c-api/zmqproducerstatetable.cpp index 3e50916e..e1c18680 100644 --- a/common/c-api/zmqproducerstatetable.cpp +++ b/common/c-api/zmqproducerstatetable.cpp @@ -1,8 +1,11 @@ +#include + #include "zmqproducerstatetable.h" #include "../zmqproducerstatetable.h" using namespace std; using namespace swss; +using boost::numeric_cast; SWSSZmqProducerStateTable SWSSZmqProducerStateTable_new(SWSSDBConnector db, const char *tableName, SWSSZmqClient zmqc, uint8_t dbPersistence) { diff --git a/common/dbconnector.cpp b/common/dbconnector.cpp index 96334780..47fe80d3 100755 --- a/common/dbconnector.cpp +++ b/common/dbconnector.cpp @@ -562,7 +562,7 @@ void RedisContext::initContext(const char *host, int port, const timeval *tv) if (m_conn->err) throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis"); + "Unable to connect to redis - " + std::string(m_conn->errstr) + "(" + std::to_string(m_conn->err) + ")"); } void RedisContext::initContext(const char *path, const timeval *tv) @@ -578,7 +578,7 @@ void RedisContext::initContext(const char *path, const timeval *tv) if (m_conn->err) throw system_error(make_error_code(errc::address_not_available), - "Unable to connect to redis (unix-socket)"); + "Unable to connect to redis (unix-socket) - " + std::string(m_conn->errstr) + "(" + std::to_string(m_conn->err) + ")"); } redisContext *RedisContext::getContext() const diff --git a/common/interface.h b/common/interface.h new file mode 100644 index 00000000..320ac883 --- /dev/null +++ b/common/interface.h @@ -0,0 +1,19 @@ +#ifndef __INTERFACE__ +#define __INTERFACE__ + +#include +#include + +namespace swss +{ + +const size_t IFACE_NAME_MAX_LEN = IFNAMSIZ - 1; + +bool isInterfaceNameValid(const std::string &ifaceName) +{ + return !ifaceName.empty() && (ifaceName.length() < IFNAMSIZ); +} + +} + +#endif diff --git a/common/producerstatetable.cpp b/common/producerstatetable.cpp index d0db5e2a..c7a35475 100644 --- a/common/producerstatetable.cpp +++ b/common/producerstatetable.cpp @@ -14,39 +14,71 @@ using namespace std; namespace swss { ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName) - : ProducerStateTable(new RedisPipeline(db, 1), tableName, false) + : ProducerStateTable(new RedisPipeline(db, 1), tableName, false, false) { m_pipeowned = true; } ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered) + : ProducerStateTable(pipeline, tableName, buffered, false) {} + +ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub) : TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector())) , TableName_KeySet(tableName) + , m_flushPub(flushPub) , m_buffered(buffered) , m_pipeowned(false) , m_tempViewActive(false) , m_pipe(pipeline) { + reloadRedisScript(); + + string luaClear = + "redis.call('DEL', KEYS[1])\n" + "local keys = redis.call('KEYS', KEYS[2] .. '*')\n" + "for i,k in pairs(keys) do\n" + " redis.call('DEL', k)\n" + "end\n" + "redis.call('DEL', KEYS[3])\n"; + m_shaClear = m_pipe->loadRedisScript(luaClear); + + string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua"); + m_shaApplyView = m_pipe->loadRedisScript(luaApplyView); +} + +ProducerStateTable::~ProducerStateTable() +{ + if (m_pipeowned) + { + delete m_pipe; + } +} + +void ProducerStateTable::reloadRedisScript() +{ + // Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush + + // However, if m_buffered is false, follow the original one publish per lua design + // Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered + + /* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */ + if (m_buffered && m_flushPub) + m_pipe->addChannel(getChannelName(m_pipe->getDbId())); + + /* 2. Setup lua strings: determine whether to attach luaPub after each lua string */ + // num in luaSet and luaDel means number of elements that were added to the key set, // not including all the elements already present into the set. string luaSet = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "for i = 0, #KEYS - 3 do\n" " redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n" - "end\n" - " if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; - m_shaSet = m_pipe->loadRedisScript(luaSet); string luaDel = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "redis.call('SADD', KEYS[4], ARGV[2])\n" - "redis.call('DEL', KEYS[3])\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; - m_shaDel = m_pipe->loadRedisScript(luaDel); + "redis.call('DEL', KEYS[3])\n"; string luaBatchedSet = "local added = 0\n" @@ -59,11 +91,7 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n" " end\n" " idx = idx + tonumber(ARGV[idx]) * 2 + 1\n" - "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; - m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); string luaBatchedDel = "local added = 0\n" @@ -71,36 +99,31 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n" " redis.call('SADD', KEYS[3], KEYS[5 + i])\n" " redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n" - "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; - m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); - string luaClear = - "redis.call('DEL', KEYS[1])\n" - "local keys = redis.call('KEYS', KEYS[2] .. '*')\n" - "for i,k in pairs(keys) do\n" - " redis.call('DEL', k)\n" - "end\n" - "redis.call('DEL', KEYS[3])\n"; - m_shaClear = m_pipe->loadRedisScript(luaClear); - - string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua"); - m_shaApplyView = m_pipe->loadRedisScript(luaApplyView); -} - -ProducerStateTable::~ProducerStateTable() -{ - if (m_pipeowned) + if (!m_flushPub || !m_buffered) { - delete m_pipe; + string luaPub = + "if added > 0 then \n" + " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" + "end\n"; + luaSet += luaPub; + luaDel += luaPub; + luaBatchedSet += luaPub; + luaBatchedDel += luaPub; } + + /* 3. load redis script based on the lua string */ + m_shaSet = m_pipe->loadRedisScript(luaSet); + m_shaDel = m_pipe->loadRedisScript(luaDel); + m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); + m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); } void ProducerStateTable::setBuffered(bool buffered) { m_buffered = buffered; + reloadRedisScript(); } void ProducerStateTable::set(const string &key, const vector &values, diff --git a/common/producerstatetable.h b/common/producerstatetable.h index b6fa7868..b00453a5 100644 --- a/common/producerstatetable.h +++ b/common/producerstatetable.h @@ -12,6 +12,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet public: ProducerStateTable(DBConnector *db, const std::string &tableName); ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false); + ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered, bool flushPub); virtual ~ProducerStateTable(); void setBuffered(bool buffered); @@ -51,6 +52,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet void apply_temp_view(); private: + bool m_flushPub; // publish per piepeline flush intead of per redis script bool m_buffered; bool m_pipeowned; bool m_tempViewActive; @@ -62,6 +64,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet std::string m_shaClear; std::string m_shaApplyView; TableDump m_tempViewState; + + void reloadRedisScript(); // redis script may change if m_buffered changes }; } diff --git a/common/redispipeline.h b/common/redispipeline.h index b8efa384..be7561b6 100644 --- a/common/redispipeline.h +++ b/common/redispipeline.h @@ -2,7 +2,10 @@ #include #include +#include #include +#include +#include #include "redisreply.h" #include "rediscommand.h" #include "dbconnector.h" @@ -22,9 +25,11 @@ class RedisPipeline { RedisPipeline(const DBConnector *db, size_t sz = 128) : COMMAND_MAX(sz) , m_remaining(0) + , m_shaPub("") { m_db = db->newConnector(NEWCONNECTOR_TIMEOUT); initializeOwnerTid(); + lastHeartBeat = std::chrono::steady_clock::now(); } ~RedisPipeline() { @@ -113,11 +118,19 @@ class RedisPipeline { void flush() { + lastHeartBeat = std::chrono::steady_clock::now(); + + if (m_remaining == 0) { + return; + } + while(m_remaining) { // Construct an object to use its dtor, so that resource is released RedisReply r(pop()); } + + publish(); } size_t size() @@ -145,12 +158,43 @@ class RedisPipeline { m_ownerTid = gettid(); } + void addChannel(std::string channel) + { + if (m_channels.find(channel) != m_channels.end()) + return; + + m_channels.insert(channel); + m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');"; + m_shaPub = loadRedisScript(m_luaPub); + } + + int getIdleTime(std::chrono::time_point tcurrent=std::chrono::steady_clock::now()) + { + return static_cast(std::chrono::duration_cast(tcurrent - lastHeartBeat).count()); + } + + void publish() { + if (m_shaPub.empty()) { + return; + } + RedisCommand cmd; + cmd.format( + "EVALSHA %s 0", + m_shaPub.c_str()); + RedisReply r(m_db, cmd); + } + private: DBConnector *m_db; std::queue m_expectedTypes; size_t m_remaining; long int m_ownerTid; + std::string m_luaPub; + std::string m_shaPub; + std::chrono::time_point lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked + std::unordered_set m_channels; + void mayflush() { if (m_remaining >= COMMAND_MAX) diff --git a/common/saiaclschema.cpp b/common/saiaclschema.cpp index 6fd32214..88c6f517 100644 --- a/common/saiaclschema.cpp +++ b/common/saiaclschema.cpp @@ -328,5 +328,32 @@ const ActionSchema &ActionSchemaByName(const std::string &action_name) return lookup->second; } +const ActionSchema& ActionSchemaByNameAndObjectType( + const std::string& action_name, const std::string& object_type) { + static const auto* const kRedirectObjectTypes = + new std::unordered_map({ + {"SAI_OBJECT_TYPE_IPMC_GROUP", + {.format = Format::kHexString, .bitwidth = 16}}, + {"SAI_OBJECT_TYPE_L2MC_GROUP", + {.format = Format::kHexString, .bitwidth = 16}}, + // SAI_OBJECT_TYPE_BRIDGE_PORT + // SAI_OBJECT_TYPE_LAG + // SAI_OBJECT_TYPE_NEXT_HOP + // SAI_OBJECT_TYPE_NEXT_HOP_GROUP + // SAI_OBJECT_TYPE_PORT + // SAI_OBJECT_TYPE_SYSTEM_PORT + }); + + if (action_name == "SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT") { + auto lookup = kRedirectObjectTypes->find(object_type); + if (lookup != kRedirectObjectTypes->end()) { + return lookup->second; + } + } + // If we haven't defined the object type, fall through to the default + // SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT format. + return ActionSchemaByName(action_name); +} + } // namespace acl } // namespace swss diff --git a/common/saiaclschema.h b/common/saiaclschema.h index 156148b1..88e66423 100644 --- a/common/saiaclschema.h +++ b/common/saiaclschema.h @@ -83,6 +83,10 @@ const MatchFieldSchema &MatchFieldSchemaByName(const std::string &match_field_na // Throws std::invalid_argument for unknown actions and actions without schemas. const ActionSchema &ActionSchemaByName(const std::string &action_name); +// Allow further format differentiation based on a SAI object type. +const ActionSchema& ActionSchemaByNameAndObjectType( + const std::string& action_name, const std::string& object_type); + } // namespace acl } // namespace swss diff --git a/common/schema.h b/common/schema.h index 2bc8ec39..108fbc8d 100644 --- a/common/schema.h +++ b/common/schema.h @@ -177,6 +177,8 @@ namespace swss { #define APP_DASH_ROUTE_GROUP_TABLE_NAME "DASH_ROUTE_GROUP_TABLE" #define APP_DASH_TUNNEL_TABLE_NAME "DASH_TUNNEL_TABLE" #define APP_DASH_PA_VALIDATION_TABLE_NAME "DASH_PA_VALIDATION_TABLE" +#define APP_DASH_METER_POLICY_TABLE_NAME "DASH_METER_POLICY_TABLE" +#define APP_DASH_METER_RULE_TABLE_NAME "DASH_METER_RULE_TABLE" #define APP_DASH_ROUTING_APPLIANCE_TABLE_NAME "DASH_ROUTING_APPLIANCE_TABLE" #define APP_PAC_PORT_TABLE_NAME "PAC_PORT_TABLE" @@ -261,6 +263,7 @@ namespace swss { #define QUEUE_ATTR_ID_LIST "QUEUE_ATTR_ID_LIST" #define BUFFER_POOL_COUNTER_ID_LIST "BUFFER_POOL_COUNTER_ID_LIST" #define ENI_COUNTER_ID_LIST "ENI_COUNTER_ID_LIST" +#define DASH_METER_COUNTER_ID_LIST "DASH_METER_COUNTER_ID_LIST" #define PFC_WD_STATE_TABLE "PFC_WD_STATE_TABLE" #define PFC_WD_PORT_COUNTER_ID_LIST "PORT_COUNTER_ID_LIST" #define PFC_WD_QUEUE_COUNTER_ID_LIST "QUEUE_COUNTER_ID_LIST" @@ -460,6 +463,7 @@ namespace swss { #define CFG_TWAMP_SESSION_TABLE_NAME "TWAMP_SESSION" #define CFG_BANNER_MESSAGE_TABLE_NAME "BANNER_MESSAGE" +#define CFG_LOGGING_TABLE_NAME "LOGGING" #define CFG_DHCP_TABLE "DHCP_RELAY" diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 02ef377a..2f510577 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -99,18 +99,18 @@ void ZmqServer::mqPollThread() // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket void* context = zmq_ctx_new();; - void* socket = zmq_socket(context, ZMQ_PULL); + void* m_socket = zmq_socket(context, ZMQ_PULL); // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; - zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); if (!m_vrf.empty()) { - zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); } - int rc = zmq_bind(socket, m_endpoint.c_str()); + int rc = zmq_bind(m_socket, m_endpoint.c_str()); if (rc != 0) { SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index 2bf953b1..b3d015e0 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -58,6 +58,7 @@ #include "zmqproducerstatetable.h" #include #include +#include "interface.h" %} %include @@ -282,6 +283,7 @@ T castSelectableObj(swss::Selectable *temp) %include "zmqserver.h" %include "zmqclient.h" %include "zmqconsumerstatetable.h" +%include "interface.h" %extend swss::DBConnector { %template(hgetall) hgetall>; @@ -296,7 +298,7 @@ T castSelectableObj(swss::Selectable *temp) %include "table.h" #ifdef ENABLE_YANG_MODULES %include "decoratortable.h" -#endif +#endif %clear std::vector &keys; %clear std::vector &ops; %clear std::vector>> &fvss; diff --git a/tests/c_api_ut.cpp b/tests/c_api_ut.cpp index d16dac7c..ed814607 100644 --- a/tests/c_api_ut.cpp +++ b/tests/c_api_ut.cpp @@ -1,4 +1,3 @@ -#include #include #include @@ -39,44 +38,63 @@ static void sortKfvs(vector &kfvs) { } } -template static void free(const T *ptr) { - std::free(const_cast(reinterpret_cast(ptr))); -} +#define free(x) std::free(const_cast(reinterpret_cast(x))); static void freeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray arr) { for (uint64_t i = 0; i < arr.len; i++) { free(arr.data[i].key); - free(arr.data[i].operation); for (uint64_t j = 0; j < arr.data[i].fieldValues.len; j++) { free(arr.data[i].fieldValues.data[j].field); - free(arr.data[i].fieldValues.data[j].value); + SWSSString_free(arr.data[i].fieldValues.data[j].value); } - free(arr.data[i].fieldValues.data); + SWSSFieldValueArray_free(arr.data[i].fieldValues); } - free(arr.data); + SWSSKeyOpFieldValuesArray_free(arr); } +struct SWSSStringManager { + vector m_strings; + + SWSSString makeString(const char *c_str) { + SWSSString s = SWSSString_new_c_str(c_str); + m_strings.push_back(s); + return s; + } + + SWSSStrRef makeStrRef(const char *c_str) { + return (SWSSStrRef)makeString(c_str); + } + + ~SWSSStringManager() { + for (SWSSString s : m_strings) + SWSSString_free(s); + } +}; + TEST(c_api, DBConnector) { clearDB(); + SWSSStringManager sm; EXPECT_THROW(SWSSDBConnector_new_named("does not exist", 0, true), out_of_range); SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); - EXPECT_FALSE(SWSSDBConnector_get(db, "mykey")); + EXPECT_EQ(SWSSDBConnector_get(db, "mykey"), nullptr); EXPECT_FALSE(SWSSDBConnector_exists(db, "mykey")); - SWSSDBConnector_set(db, "mykey", "myval"); - const char *val = SWSSDBConnector_get(db, "mykey"); - EXPECT_STREQ(val, "myval"); - free(val); + + SWSSDBConnector_set(db, "mykey", sm.makeStrRef("myval")); + SWSSString val = SWSSDBConnector_get(db, "mykey"); + EXPECT_STREQ(SWSSStrRef_c_str((SWSSStrRef)val), "myval"); + SWSSString_free(val); EXPECT_TRUE(SWSSDBConnector_exists(db, "mykey")); EXPECT_TRUE(SWSSDBConnector_del(db, "mykey")); EXPECT_FALSE(SWSSDBConnector_del(db, "mykey")); EXPECT_FALSE(SWSSDBConnector_hget(db, "mykey", "myfield")); EXPECT_FALSE(SWSSDBConnector_hexists(db, "mykey", "myfield")); - SWSSDBConnector_hset(db, "mykey", "myfield", "myval"); + SWSSDBConnector_hset(db, "mykey", "myfield", sm.makeStrRef("myval")); val = SWSSDBConnector_hget(db, "mykey", "myfield"); - EXPECT_STREQ(val, "myval"); - free(val); + EXPECT_STREQ(SWSSStrRef_c_str((SWSSStrRef)val), "myval"); + SWSSString_free(val); + EXPECT_TRUE(SWSSDBConnector_hexists(db, "mykey", "myfield")); EXPECT_FALSE(SWSSDBConnector_hget(db, "mykey", "notmyfield")); EXPECT_FALSE(SWSSDBConnector_hexists(db, "mykey", "notmyfield")); @@ -89,27 +107,32 @@ TEST(c_api, DBConnector) { TEST(c_api, ConsumerProducerStateTables) { clearDB(); + SWSSStringManager sm; SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); SWSSProducerStateTable pst = SWSSProducerStateTable_new(db, "mytable"); SWSSConsumerStateTable cst = SWSSConsumerStateTable_new(db, "mytable", nullptr, nullptr); + SWSSConsumerStateTable_getFd(cst); + SWSSKeyOpFieldValuesArray arr = SWSSConsumerStateTable_pops(cst); ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); - SWSSFieldValuePair data[2] = {{.field = "myfield1", .value = "myvalue1"}, - {.field = "myfield2", .value = "myvalue2"}}; + SWSSFieldValueTuple data[2] = { + {.field = "myfield1", .value = sm.makeString("myvalue1")}, + {.field = "myfield2", .value = sm.makeString("myvalue2")}}; SWSSFieldValueArray values = { .len = 2, .data = data, }; SWSSProducerStateTable_set(pst, "mykey1", values); - data[0] = {.field = "myfield3", .value = "myvalue3"}; + data[0] = {.field = "myfield3", .value = sm.makeString("myvalue3")}; values.len = 1; SWSSProducerStateTable_set(pst, "mykey2", values); + ASSERT_EQ(SWSSConsumerStateTable_readData(cst, 300, true), SWSSSelectResult_DATA); arr = SWSSConsumerStateTable_pops(cst); vector kfvs = takeKeyOpFieldValuesArray(arr); sortKfvs(kfvs); @@ -131,7 +154,7 @@ TEST(c_api, ConsumerProducerStateTables) { ASSERT_EQ(fieldValues1.size(), 1); EXPECT_EQ(fieldValues1[0].first, "myfield3"); EXPECT_EQ(fieldValues1[0].second, "myvalue3"); - + arr = SWSSConsumerStateTable_pops(cst); EXPECT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); @@ -164,27 +187,25 @@ TEST(c_api, ConsumerProducerStateTables) { TEST(c_api, SubscriberStateTable) { clearDB(); + SWSSStringManager sm; SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); SWSSSubscriberStateTable sst = SWSSSubscriberStateTable_new(db, "mytable", nullptr, nullptr); - EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300), SWSSSelectResult_TIMEOUT); - EXPECT_FALSE(SWSSSubscriberStateTable_hasData(sst)); + SWSSSubscriberStateTable_getFd(sst); + + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300, true), SWSSSelectResult_TIMEOUT); SWSSKeyOpFieldValuesArray arr = SWSSSubscriberStateTable_pops(sst); EXPECT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); - SWSSDBConnector_hset(db, "mytable:mykey", "myfield", "myvalue"); - EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300), SWSSSelectResult_DATA); - EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300), SWSSSelectResult_TIMEOUT); - EXPECT_TRUE(SWSSSubscriberStateTable_hasData(sst)); - + SWSSDBConnector_hset(db, "mytable:mykey", "myfield", sm.makeStrRef("myvalue")); + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300, true), SWSSSelectResult_DATA); arr = SWSSSubscriberStateTable_pops(sst); vector kfvs = takeKeyOpFieldValuesArray(arr); sortKfvs(kfvs); freeKeyOpFieldValuesArray(arr); - EXPECT_FALSE(SWSSSubscriberStateTable_hasData(sst)); ASSERT_EQ(kfvs.size(), 1); EXPECT_EQ(kfvKey(kfvs[0]), "mykey"); EXPECT_EQ(kfvOp(kfvs[0]), "SET"); @@ -199,6 +220,7 @@ TEST(c_api, SubscriberStateTable) { TEST(c_api, ZmqConsumerProducerStateTable) { clearDB(); + SWSSStringManager sm; SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); @@ -211,11 +233,10 @@ TEST(c_api, ZmqConsumerProducerStateTable) { SWSSZmqConsumerStateTable cst = SWSSZmqConsumerStateTable_new(db, "mytable", srv, nullptr, nullptr); + SWSSZmqConsumerStateTable_getFd(cst); + ASSERT_EQ(SWSSZmqConsumerStateTable_getDbConnector(cst), db); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasCachedData(cst)); - EXPECT_FALSE(SWSSZmqConsumerStateTable_initializedWithData(cst)); SWSSKeyOpFieldValuesArray arr = SWSSZmqConsumerStateTable_pops(cst); ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); @@ -223,37 +244,32 @@ TEST(c_api, ZmqConsumerProducerStateTable) { // On flag = 0, we use the ZmqProducerStateTable // On flag = 1, we use the ZmqClient directly for (int flag = 0; flag < 2; flag++) { - SWSSFieldValuePair values_key1_data[2] = {{.field = "myfield1", .value = "myvalue1"}, - {.field = "myfield2", .value = "myvalue2"}}; + SWSSFieldValueTuple values_key1_data[2] = {{.field = "myfield1", .value = sm.makeString("myvalue1")}, + {.field = "myfield2", .value = sm.makeString("myvalue2")}}; SWSSFieldValueArray values_key1 = { .len = 2, .data = values_key1_data, }; - SWSSFieldValuePair values_key2_data[1] = {{.field = "myfield3", .value = "myvalue3"}}; + SWSSFieldValueTuple values_key2_data[1] = {{.field = "myfield3", .value = sm.makeString("myvalue3")}}; SWSSFieldValueArray values_key2 = { .len = 1, .data = values_key2_data, }; SWSSKeyOpFieldValues arr_data[2] = { - {.key = "mykey1", .operation = "SET", .fieldValues = values_key1}, - {.key = "mykey2", .operation = "SET", .fieldValues = values_key2}}; + {.key = "mykey1", .operation = SWSSKeyOperation_SET, .fieldValues = values_key1}, + {.key = "mykey2", .operation = SWSSKeyOperation_SET, .fieldValues = values_key2}}; arr = {.len = 2, .data = arr_data}; if (flag == 0) for (uint64_t i = 0; i < arr.len; i++) SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues); else - SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", &arr); + SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); - ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_DATA); - EXPECT_TRUE(SWSSZmqConsumerStateTable_hasData(cst)); - EXPECT_TRUE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 1500, true), SWSSSelectResult_DATA); arr = SWSSZmqConsumerStateTable_pops(cst); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasCachedData(cst)); - EXPECT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_TIMEOUT); vector kfvs = takeKeyOpFieldValuesArray(arr); sortKfvs(kfvs); @@ -276,28 +292,22 @@ TEST(c_api, ZmqConsumerProducerStateTable) { EXPECT_EQ(fieldValues1[0].first, "myfield3"); EXPECT_EQ(fieldValues1[0].second, "myvalue3"); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); arr = SWSSZmqConsumerStateTable_pops(cst); ASSERT_EQ(arr.len, 0); freeKeyOpFieldValuesArray(arr); - arr_data[0] = {.key = "mykey3", .operation = "DEL", .fieldValues = {}}; - arr_data[1] = {.key = "mykey4", .operation = "DEL", .fieldValues = {}}; - arr = { .len = 2, .data = arr_data }; + arr_data[0] = {.key = "mykey3", .operation = SWSSKeyOperation_DEL, .fieldValues = {}}; + arr_data[1] = {.key = "mykey4", .operation = SWSSKeyOperation_DEL, .fieldValues = {}}; + arr = {.len = 2, .data = arr_data}; if (flag == 0) for (uint64_t i = 0; i < arr.len; i++) SWSSZmqProducerStateTable_del(pst, arr.data[i].key); else - SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", &arr); + SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", arr); - ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_DATA); - EXPECT_TRUE(SWSSZmqConsumerStateTable_hasData(cst)); - EXPECT_TRUE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500, true), SWSSSelectResult_DATA); arr = SWSSZmqConsumerStateTable_pops(cst); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); - EXPECT_FALSE(SWSSZmqConsumerStateTable_hasCachedData(cst)); - EXPECT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_TIMEOUT); kfvs = takeKeyOpFieldValuesArray(arr); sortKfvs(kfvs); diff --git a/tests/redis_piped_state_ut.cpp b/tests/redis_piped_state_ut.cpp index ca329190..f3173876 100644 --- a/tests/redis_piped_state_ut.cpp +++ b/tests/redis_piped_state_ut.cpp @@ -730,3 +730,59 @@ TEST(ConsumerStateTable, async_multitable) cout << endl << "Done." << endl; } + +TEST(ConsumerStateTable, flushPub) +{ + clearDB(); + + /* Prepare producer */ + int index = 0; + string tableName = "UT_REDIS_THREAD_" + to_string(index); + DBConnector db(TEST_DB, 0, true); + RedisPipeline pipeline(&db); + ProducerStateTable p(&pipeline, tableName, false, true); + p.setBuffered(true); + + string key = "TheKey"; + int maxNumOfFields = 2; + + /* Set operation */ + { + vector fields; + for (int j = 0; j < maxNumOfFields; j++) + { + FieldValueTuple t(field(j), value(j)); + fields.push_back(t); + } + p.set(key, fields); + } + + /* Del operation */ + p.del(key); + p.flush(); + + /* Prepare consumer */ + ConsumerStateTable c(&db, tableName); + Select cs; + Selectable *selectcs; + cs.addSelectable(&c); + + /* First pop operation */ + { + int ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + KeyOpFieldsValuesTuple kco; + c.pop(kco); + EXPECT_EQ(kfvKey(kco), key); + EXPECT_EQ(kfvOp(kco), "DEL"); + + auto fvs = kfvFieldsValues(kco); + EXPECT_EQ(fvs.size(), 0U); + } + + /* Second select operation */ + { + int ret = cs.select(&selectcs, 1000); + EXPECT_EQ(ret, Select::TIMEOUT); + } +} \ No newline at end of file diff --git a/tests/redis_ut.cpp b/tests/redis_ut.cpp index 4f691e88..f53f891d 100644 --- a/tests/redis_ut.cpp +++ b/tests/redis_ut.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include "gtest/gtest.h" #include "common/dbconnector.h" #include "common/producertable.h" @@ -20,6 +22,7 @@ using namespace std; using namespace swss; +using namespace testing; #define NUMBER_OF_THREADS (64) // Spawning more than 256 threads causes libc++ to except #define NUMBER_OF_OPS (1000) @@ -1139,3 +1142,32 @@ TEST(Connector, hmset) // test empty multi hash db.hmset({}); } + +TEST(Connector, connectFail) +{ + // connect to an ip which is not a redis server + EXPECT_THROW({ + try + { + DBConnector db(0, "1.1.1.1", 6379, 1); + } + catch(const std::system_error& e) + { + EXPECT_THAT(e.what(), HasSubstr("Unable to connect to redis - ")); + throw; + } + }, std::system_error); + + // connect to an invalid unix socket address + EXPECT_THROW({ + try + { + DBConnector db(0, "/tmp/invalid", 1); + } + catch(const std::system_error& e) + { + EXPECT_THAT(e.what(), HasSubstr("Unable to connect to redis (unix-socket) - ")); + throw; + } + }, std::system_error); +} diff --git a/tests/saiaclschema_ut.cpp b/tests/saiaclschema_ut.cpp index fff9158d..1f828f77 100644 --- a/tests/saiaclschema_ut.cpp +++ b/tests/saiaclschema_ut.cpp @@ -60,6 +60,37 @@ TEST(SaiAclSchemaTest, ActionSchemaByNameSucceeds) AllOf(Field(&ActionSchema::format, Format::kHexString), Field(&ActionSchema::bitwidth, 12))); } +TEST(SaiAclSchemaTest, ActionSchemaByNameAndObjectTypeSucceeds) { + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", + "SAI_OBJECT_TYPE_IPMC_GROUP"), + AllOf(Field(&ActionSchema::format, Format::kHexString), + Field(&ActionSchema::bitwidth, 16))); + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", + "SAI_OBJECT_TYPE_L2MC_GROUP"), + AllOf(Field(&ActionSchema::format, Format::kHexString), + Field(&ActionSchema::bitwidth, 16))); + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", + "SAI_OBJECT_TYPE_NEXT_HOP"), + AllOf(Field(&ActionSchema::format, Format::kString), + Field(&ActionSchema::bitwidth, 0))); + EXPECT_THAT(ActionSchemaByNameAndObjectType( + "SAI_ACL_ENTRY_ATTR_ACTION_REDIRECT", "SAI_OBJECT_TYPE_PORT"), + AllOf(Field(&ActionSchema::format, Format::kString), + Field(&ActionSchema::bitwidth, 0))); +} + +TEST(SaiAclSchemaTest, + ActionSchemaByNameAndObjectTypeWithNonRedirectActionSucceeds) { + EXPECT_THAT( + ActionSchemaByNameAndObjectType("SAI_ACL_ENTRY_ATTR_ACTION_DECREMENT_TTL", + "SAI_OBJECT_TYPE_UNKNOWN"), + AllOf(Field(&ActionSchema::format, Format::kHexString), + Field(&ActionSchema::bitwidth, 1))); +} + // Invalid Lookup Tests TEST(SaiAclSchemaTest, InvalidFormatNameThrowsException) @@ -82,6 +113,11 @@ TEST(SaiAclSchemaTest, InvalidActionNameThrowsException) EXPECT_THROW(ActionSchemaByName("Foo"), std::invalid_argument); } +TEST(SaiAclSchemaTest, InvalidActionNameAndObjectTypeThrowsException) { + EXPECT_THROW(ActionSchemaByNameAndObjectType("Foo", "unknown"), + std::invalid_argument); +} + } // namespace } // namespace acl } // namespace swss diff --git a/tests/test_interface.py b/tests/test_interface.py new file mode 100644 index 00000000..25c809ce --- /dev/null +++ b/tests/test_interface.py @@ -0,0 +1,8 @@ +from swsscommon import swsscommon + +def test_is_interface_name_valid(): + invalid_interface_name = "TooLongInterfaceName" + assert not swsscommon.isInterfaceNameValid(invalid_interface_name) + + validInterfaceName = "OkInterfaceName" + assert swsscommon.isInterfaceNameValid(validInterfaceName) diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 4818b7fd..56a8299f 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -438,3 +438,30 @@ TEST(ZmqConsumerStateTableBatchBufferOverflow, test) } EXPECT_ANY_THROW(p.send(kcos)); } + +TEST(ZmqProducerStateTableDeleteAfterSend, test) +{ + std::string testTableName = "ZMQ_PROD_DELETE_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + std::string testKey = "testKey"; + + ZmqServer server(pullEndpoint); + + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint); + + auto *p = new ZmqProducerStateTable(&db, testTableName, client, true); + std::vector values; + FieldValueTuple t("test", "test"); + values.push_back(t); + p->set(testKey,values); + delete p; + + sleep(1); + + Table table(&db, testTableName); + std::vector keys; + table.getKeys(keys); + EXPECT_EQ(keys.front(), testKey); +}