diff --git a/C/common/config_category.cpp b/C/common/config_category.cpp old mode 100644 new mode 100755 index d2db0482e3..1dbeb64b1b --- a/C/common/config_category.cpp +++ b/C/common/config_category.cpp @@ -476,6 +476,8 @@ string ConfigCategory::getItemAttribute(const string& itemName, return m_items[i]->m_deprecated; case RULE_ATTR: return m_items[i]->m_rule; + case BUCKET_PROPERTIES_ATTR: + return m_items[i]->m_bucketProperties; default: throw new ConfigItemAttributeNotFound(); } @@ -541,6 +543,9 @@ bool ConfigCategory::setItemAttribute(const string& itemName, case RULE_ATTR: m_items[i]->m_rule = value; return true; + case BUCKET_PROPERTIES_ATTR: + m_items[i]->m_bucketProperties = value; + return true; default: return false; } @@ -1038,6 +1043,10 @@ ConfigCategory::CategoryItem::CategoryItem(const string& name, { m_itemType = CodeItem; } + if (m_type.compare("bucket") == 0) + { + m_itemType = BucketItem; + } if (item.HasMember("deprecated")) { @@ -1083,6 +1092,33 @@ ConfigCategory::CategoryItem::CategoryItem(const string& name, m_rule = ""; } + if (item.HasMember("properties")) + { + Logger::getLogger()->debug("item['properties'].IsString()=%s, item['properties'].IsObject()=%s", + item["properties"].IsString()?"true":"false", + item["properties"].IsObject()?"true":"false"); + + rapidjson::StringBuffer strbuf; + rapidjson::Writer writer(strbuf); + item["properties"].Accept(writer); + m_bucketProperties = item["properties"].IsObject() ? + // use current string + strbuf.GetString() : + // Unescape the string + JSONunescape(strbuf.GetString()); + + Logger::getLogger()->debug("m_bucketProperties=%s", m_bucketProperties.c_str()); + } + else + { + m_bucketProperties = ""; + } + + if (m_itemType == BucketItem && m_bucketProperties.empty()) + { + throw new runtime_error("Bucket configuration item is missing the \"properties\" attribute"); + } + if (item.HasMember("options")) { const Value& options = item["options"]; @@ -1095,7 +1131,7 @@ ConfigCategory::CategoryItem::CategoryItem(const string& name, } } - std:string m_typeUpperCase = m_type; + std::string m_typeUpperCase = m_type; for (auto & c: m_typeUpperCase) c = toupper(c); // Item "value" can be an escaped JSON string, so check m_type JSON as well @@ -1377,6 +1413,7 @@ ConfigCategory::CategoryItem::CategoryItem(const CategoryItem& rhs) m_validity = rhs.m_validity; m_group = rhs.m_group; m_rule = rhs.m_rule; + m_bucketProperties = rhs.m_bucketProperties; } /** @@ -1467,6 +1504,11 @@ ostringstream convert; convert << ", \"rule\" : \"" << JSONescape(m_rule) << "\""; } + if (!m_bucketProperties.empty()) + { + convert << ", \"properties\" : " << m_bucketProperties; + } + if (!m_group.empty()) { convert << ", \"group\" : \"" << m_group << "\""; @@ -1538,6 +1580,11 @@ ostringstream convert; convert << ", \"rule\" : \"" << JSONescape(m_rule) << "\""; } + if (!m_bucketProperties.empty()) + { + convert << ", \"properties\" : " << m_bucketProperties; + } + if (!m_group.empty()) { convert << ", \"group\" : \"" << m_group << "\""; @@ -1587,6 +1634,31 @@ ostringstream convert; return convert.str(); } +/** + * Parse BucketItem value in JSON dict format and return the key value pairs within that + * + * @param json JSON string representing the BucketItem value + * @return Vector with pairs of found key/value string pairs in BucketItem value + */ +vector>* ConfigCategory::parseBucketItemValue(const string & json) +{ + Document document; + if (document.Parse(json.c_str()).HasParseError()) + { + Logger::getLogger()->error("parseBucketItemValue(): The provided JSON string has a parse error: %s", + GetParseError_En(document.GetParseError())); + return NULL; + } + + vector> *vec = new vector>; + + for (const auto & m : document.GetObject()) + vec->emplace_back(make_pair(m.name.GetString(), m.value.GetString())); + + return vec; +} + + // DefaultConfigCategory constructor DefaultConfigCategory::DefaultConfigCategory(const string& name, const string& json) : ConfigCategory::ConfigCategory(name, json) diff --git a/C/common/include/config_category.h b/C/common/include/config_category.h old mode 100644 new mode 100755 index bc87d943d0..b95220e653 --- a/C/common/include/config_category.h +++ b/C/common/include/config_category.h @@ -64,7 +64,8 @@ class ConfigCategory { DoubleItem, ScriptItem, CategoryType, - CodeItem + CodeItem, + BucketItem }; ConfigCategory(const std::string& name, const std::string& json); @@ -129,13 +130,17 @@ class ConfigCategory { GROUP_ATTR, DISPLAY_NAME_ATTR, DEPRECATED_ATTR, - RULE_ATTR}; + RULE_ATTR, + BUCKET_PROPERTIES_ATTR + }; std::string getItemAttribute(const std::string& itemName, ItemAttribute itemAttribute) const; bool setItemAttribute(const std::string& itemName, ItemAttribute itemAttribute, const std::string& value); + std::vector>* parseBucketItemValue(const std::string &); + protected: class CategoryItem { public: @@ -174,6 +179,7 @@ class ConfigCategory { std::string m_validity; std::string m_group; std::string m_rule; + std::string m_bucketProperties; }; std::vector m_items; std::string m_name; diff --git a/C/common/include/service_record.h b/C/common/include/service_record.h index 3e36e1d8a8..d823f2bfc2 100644 --- a/C/common/include/service_record.h +++ b/C/common/include/service_record.h @@ -45,6 +45,10 @@ class ServiceRecord : public JSONProvider { { m_protocol = protocol; } + const std::string& getProtocol() const + { + return m_protocol; + } void setManagementPort(const unsigned short managementPort) { m_managementPort = managementPort; diff --git a/C/common/logger.cpp b/C/common/logger.cpp index 1470dd38be..ead4f4c0be 100755 --- a/C/common/logger.cpp +++ b/C/common/logger.cpp @@ -46,6 +46,7 @@ static char ident[80]; } openlog(ident, LOG_PID|LOG_CONS, LOG_USER); instance = this; + m_level = LOG_WARNING; } Logger::~Logger() diff --git a/C/common/reading_set.cpp b/C/common/reading_set.cpp index 382d321bc4..df01437359 100755 --- a/C/common/reading_set.cpp +++ b/C/common/reading_set.cpp @@ -383,7 +383,7 @@ JSONReading::JSONReading(const Value& json) { if (json.HasMember("id")) { - m_id = json["id"].GetUint(); + m_id = json["id"].GetUint64(); m_has_id = true; } else diff --git a/C/common/result_set.cpp b/C/common/result_set.cpp index 953d15dd68..c79482d969 100644 --- a/C/common/result_set.cpp +++ b/C/common/result_set.cpp @@ -92,7 +92,14 @@ ResultSet::ResultSet(const std::string& json) switch (m_columns[colNo]->getType()) { case STRING_COLUMN: - rowValue->append(new ColumnValue(string(item->value.GetString()))); + if (item->value.IsBool()) + { + rowValue->append(new ColumnValue(item->value.IsTrue() ? "true" : "false")); + } + else + { + rowValue->append(new ColumnValue(string(item->value.GetString()))); + } break; case INT_COLUMN: rowValue->append(new ColumnValue((long)(item->value.GetInt64()))); @@ -104,8 +111,10 @@ ResultSet::ResultSet(const std::string& json) rowValue->append(new ColumnValue(item->value)); break; case BOOL_COLUMN: - // TODO Add support - rowValue->append(new ColumnValue(string("TODO"))); + if (item->value.IsString()) + rowValue->append(new ColumnValue(string(item->value.GetString()))); + else + rowValue->append(new ColumnValue(item->value.IsTrue() ? "true" : "false")); break; } colNo++; diff --git a/C/plugins/common/libcurl_https.cpp b/C/plugins/common/libcurl_https.cpp index f1cc6eb582..3bb6eac64e 100644 --- a/C/plugins/common/libcurl_https.cpp +++ b/C/plugins/common/libcurl_https.cpp @@ -30,6 +30,21 @@ using namespace std; +/** + * Creates a UTC time string for the current time + * + * @return Current UTC time + */ +static std::string CurrentTimeString() +{ + time_t now = time(NULL); + struct tm timeinfo; + gmtime_r(&now, &timeinfo); + char timeString[20]; + strftime(timeString, sizeof(timeString), "%F %T", &timeinfo); + return std::string(timeString); +} + /** * Constructor: host:port, connect_timeout, request_timeout, * retry_sleep_Time, max_retry @@ -312,6 +327,7 @@ int LibcurlHttps::sendRequest( do { + std::chrono::high_resolution_clock::time_point tStart; try { exceptionRaised = none; @@ -334,6 +350,7 @@ int LibcurlHttps::sendRequest( } m_ofs << "Payload:" << endl; m_ofs << payload << endl; + tStart = std::chrono::high_resolution_clock::now(); } // Execute the HTTP method @@ -346,8 +363,10 @@ int LibcurlHttps::sendRequest( httpResponseText = httpHeaderBuffer; if (m_log) { + std::chrono::high_resolution_clock::time_point tEnd = std::chrono::high_resolution_clock::now(); m_ofs << "Response:" << endl; m_ofs << " Code: " << httpCode << endl; + m_ofs << " Time: " << ((double)std::chrono::duration_cast(tEnd - tStart).count()) / 1.0E6 << " sec " << CurrentTimeString() << endl; m_ofs << " Content: " << httpResponseText << endl << endl; } StringStripCRLF(httpResponseText); @@ -409,6 +428,13 @@ int LibcurlHttps::sendRequest( } #endif + if (m_log && !errorMessage.empty()) + { + std::chrono::high_resolution_clock::time_point tEnd = std::chrono::high_resolution_clock::now(); + m_ofs << " Time: " << ((double)std::chrono::duration_cast(tEnd - tStart).count()) / 1.0E6 << " sec " << CurrentTimeString() << endl; + m_ofs << " Exception: " << errorMessage << endl; + } + if (retryCount < m_max_retry) { this_thread::sleep_for(chrono::seconds(sleepTime)); diff --git a/C/plugins/common/simple_http.cpp b/C/plugins/common/simple_http.cpp index 4c76a0c28c..020cc3e448 100644 --- a/C/plugins/common/simple_http.cpp +++ b/C/plugins/common/simple_http.cpp @@ -17,6 +17,21 @@ using namespace std; +/** + * Creates a UTC time string for the current time + * + * @return Current UTC time + */ +static std::string CurrentTimeString() +{ + time_t now = time(NULL); + struct tm timeinfo; + gmtime_r(&now, &timeinfo); + char timeString[20]; + strftime(timeString, sizeof(timeString), "%F %T", &timeinfo); + return std::string(timeString); +} + // Using https://github.com/eidheim/Simple-Web-Server using HttpClient = SimpleWeb::Client; @@ -126,6 +141,7 @@ int SimpleHttp::sendRequest( do { + std::chrono::high_resolution_clock::time_point tStart; try { exception_raised = none; @@ -141,6 +157,7 @@ int SimpleHttp::sendRequest( } m_ofs << "Payload:" << endl; m_ofs << payload << endl; + tStart = std::chrono::high_resolution_clock::now(); } // Call HTTPS method @@ -151,8 +168,10 @@ int SimpleHttp::sendRequest( if (m_log) { + std::chrono::high_resolution_clock::time_point tEnd = std::chrono::high_resolution_clock::now(); m_ofs << "Response:" << endl; m_ofs << " Code: " << res->status_code << endl; + m_ofs << " Time: " << ((double)std::chrono::duration_cast(tEnd - tStart).count()) / 1.0E6 << " sec " << CurrentTimeString() << endl; m_ofs << " Content: " << res->content.string() << endl << endl; } @@ -213,6 +232,13 @@ int SimpleHttp::sendRequest( } #endif + if (m_log && !exception_message.empty()) + { + std::chrono::high_resolution_clock::time_point tEnd = std::chrono::high_resolution_clock::now(); + m_ofs << " Time: " << ((double)std::chrono::duration_cast(tEnd - tStart).count()) / 1.0E6 << " sec " << CurrentTimeString() << endl; + m_ofs << " Exception: " << exception_message << endl; + } + if (retry_count < m_max_retry) { this_thread::sleep_for(chrono::seconds(sleep_time)); diff --git a/C/plugins/common/simple_https.cpp b/C/plugins/common/simple_https.cpp index 35c2584c49..18a138a010 100644 --- a/C/plugins/common/simple_https.cpp +++ b/C/plugins/common/simple_https.cpp @@ -19,6 +19,21 @@ using namespace std; +/** + * Creates a UTC time string for the current time + * + * @return Current UTC time + */ +static std::string CurrentTimeString() +{ + time_t now = time(NULL); + struct tm timeinfo; + gmtime_r(&now, &timeinfo); + char timeString[20]; + strftime(timeString, sizeof(timeString), "%F %T", &timeinfo); + return std::string(timeString); +} + // Using https://github.com/eidheim/Simple-Web-Server using HttpsClient = SimpleWeb::Client; @@ -137,6 +152,7 @@ int SimpleHttps::sendRequest( do { + std::chrono::high_resolution_clock::time_point tStart; try { exception_raised = none; @@ -152,6 +168,7 @@ int SimpleHttps::sendRequest( } m_ofs << "Payload:" << endl; m_ofs << payload << endl; + tStart = std::chrono::high_resolution_clock::now(); } // Call HTTPS method @@ -163,8 +180,10 @@ int SimpleHttps::sendRequest( if (m_log) { + std::chrono::high_resolution_clock::time_point tEnd = std::chrono::high_resolution_clock::now(); m_ofs << "Response:" << endl; m_ofs << " Code: " << res->status_code << endl; + m_ofs << " Time: " << ((double)std::chrono::duration_cast(tEnd - tStart).count()) / 1.0E6 << " sec " << CurrentTimeString() << endl; m_ofs << " Content: " << res->content.string() << endl << endl; } @@ -179,7 +198,6 @@ int SimpleHttps::sendRequest( { exception_raised = typeBadRequest; exception_message = ex.what(); - } catch (exception &ex) { @@ -222,6 +240,13 @@ int SimpleHttps::sendRequest( } #endif + if (m_log && !exception_message.empty()) + { + std::chrono::high_resolution_clock::time_point tEnd = std::chrono::high_resolution_clock::now(); + m_ofs << " Time: " << ((double)std::chrono::duration_cast(tEnd - tStart).count()) / 1.0E6 << " sec " << CurrentTimeString() << endl; + m_ofs << " Exception: " << exception_message << endl; + } + if (retry_count < m_max_retry) { this_thread::sleep_for(chrono::seconds(sleep_time)); diff --git a/C/plugins/north/OMF/include/linkedlookup.h b/C/plugins/north/OMF/include/linkedlookup.h new file mode 100644 index 0000000000..560b3da251 --- /dev/null +++ b/C/plugins/north/OMF/include/linkedlookup.h @@ -0,0 +1,80 @@ +#ifndef _LINKEDLOOKUP_H +#define _LINKEDLOOKUP_H +typedef enum { + OMFBT_UNKNOWN, OMFBT_DOUBLE64, OMFBT_DOUBLE32, OMFBT_INTEGER16, + OMFBT_INTEGER32, OMFBT_INTEGER64, OMFBT_UINTEGER16, OMFBT_UINTEGER32, + OMFBT_UINTEGER64, OMFBT_STRING, OMFBT_FLEDGEASSET +} OMFBaseType; + +/** + * Lookup status bit + */ +#define LAL_ASSET_SENT 0x01 // We have sent the asset +#define LAL_LINK_SENT 0x02 // We have sent the link to the base type +#define LAL_CONTAINER_SENT 0x04 // We have sent the container +#define LAL_AFLINK_SENT 0x08 // We have sent the link to the AF location + +/** + * Linked Asset Information class + * + * This is the data stored for each asset and asset datapoint pair that + * is being sent to PI using the linked container mechanism. We use the class + * so we can combine all the information we need in a single lookup table, + * this not only saves space but allows to build and retain the table + * before we start building the payloads. This hopefully will help prevent + * to much memory fragmentation, which was an issue with the old, separate + * lookup mechanism we had. + */ +class LALookup { + public: + LALookup() { m_sentState = 0; m_baseType = OMFBT_UNKNOWN; }; + bool assetState(const std::string& tagName) + { + return ((m_sentState & LAL_ASSET_SENT) != 0) + && (m_tagName.compare(tagName) == 0); + }; + bool linkState(const std::string& tagName) + { + return ((m_sentState & LAL_LINK_SENT) != 0) + && (m_tagName.compare(tagName) == 0); + }; + bool containerState(const std::string& tagName) + { + return ((m_sentState & LAL_CONTAINER_SENT) != 0) + && (m_tagName.compare(tagName) == 0); + }; + bool afLinkState() { return (m_sentState & LAL_AFLINK_SENT) != 0; }; + void setBaseType(const std::string& baseType); + OMFBaseType getBaseType() { return m_baseType; }; + std::string getBaseTypeString(); + void assetSent(const std::string& tagName) + { + if (m_tagName.compare(tagName)) + { + m_sentState = LAL_ASSET_SENT; + m_tagName = tagName; + } + else + { + m_sentState |= LAL_ASSET_SENT; + } + }; + void linkSent(const std::string& tagName) + { + if (m_tagName.compare(tagName)) + { + // Force the container to resend if the tagName changes + m_tagName = tagName; + m_sentState &= ~LAL_CONTAINER_SENT; + } + m_sentState |= LAL_LINK_SENT; + }; + void afLinkSent() { m_sentState |= LAL_AFLINK_SENT; }; + void containerSent(const std::string& tagName, const std::string& baseType); + void containerSent(const std::string& tagName, OMFBaseType baseType); + private: + uint8_t m_sentState; + OMFBaseType m_baseType; + std::string m_tagName; +}; +#endif diff --git a/C/plugins/north/OMF/include/omf.h b/C/plugins/north/OMF/include/omf.h index f3081d0bb6..68a382bf37 100644 --- a/C/plugins/north/OMF/include/omf.h +++ b/C/plugins/north/OMF/include/omf.h @@ -17,6 +17,7 @@ #include #include #include +#include #define OMF_HINT "OMFHint" @@ -485,25 +486,13 @@ class OMF bool m_linkedProperties; /** - * The container for this asset and data point has been sent in - * this session. + * The state of the linked assets, the key is + * either an asset name with an underscore appended + * or an asset name, followed by an underscore and a + * data point name */ - std::unordered_map - m_containerSent; - - /** - * The data message for this asset and data point has been sent in - * this session. - */ - std::unordered_map - m_assetSent; - - /** - * The link for this asset and data point has been sent in - * this session. - */ - std::unordered_map - m_linkSent; + std::unordered_map + m_linkedAssetState; /** * Force the data to be sent using the legacy, complex OMF types diff --git a/C/plugins/north/OMF/include/omfinfo.h b/C/plugins/north/OMF/include/omfinfo.h index d2d2aeae45..0e38ad09f0 100644 --- a/C/plugins/north/OMF/include/omfinfo.h +++ b/C/plugins/north/OMF/include/omfinfo.h @@ -30,6 +30,7 @@ #include "utils.h" #include "string_utils.h" #include +#include #include "crypto.hpp" diff --git a/C/plugins/north/OMF/include/omflinkeddata.h b/C/plugins/north/OMF/include/omflinkeddata.h index bf12ac4e6b..c25d1ba38a 100644 --- a/C/plugins/north/OMF/include/omflinkeddata.h +++ b/C/plugins/north/OMF/include/omflinkeddata.h @@ -13,6 +13,7 @@ #include #include #include +#include /** * The OMFLinkedData class. @@ -34,13 +35,9 @@ class OMFLinkedData { public: - OMFLinkedData( std::unordered_map *containerSent, - std::unordered_map *assetSent, - std::unordered_map *linkSent, + OMFLinkedData( std::unordered_map *linkedAssetState, const OMF_ENDPOINT PIServerEndpoint = ENDPOINT_CR) : - m_containerSent(containerSent), - m_assetSent(assetSent), - m_linkSent(linkSent), + m_linkedAssetState(linkedAssetState), m_endpoint(PIServerEndpoint), m_doubleFormat("float64"), m_integerFormat("int64") @@ -48,6 +45,8 @@ class OMFLinkedData std::string processReading(const Reading& reading, const std::string& DefaultAFLocation = std::string(), OMFHints *hints = NULL); + void buildLookup(const std::vector& reading); + void setSendFullStructure(const bool sendFullStructure) {m_sendFullStructure = sendFullStructure;}; bool flushContainers(HttpSender& sender, const std::string& path, std::vector >& header); void setFormats(const std::string& doubleFormat, const std::string& integerFormat) { @@ -71,26 +70,15 @@ class OMFLinkedData }; private: + bool m_sendFullStructure; + /** * The container for this asset and data point has been sent in * this session. The key is the asset followed by the datapoint name * with a '.' delimiter between. The value is the base type used, a * container will be sent if the base type changes. */ - std::unordered_map *m_containerSent; - - /** - * The data message for this asset has been sent in - * this session. The key is the asset name. The value is always true. - */ - std::unordered_map *m_assetSent; - - /** - * The link for this asset and data point has been sent in - * this session. key is the asset followed by the datapoint name - * with a '.' delimiter between. The value is always true. - */ - std::unordered_map *m_linkSent; + std::unordered_map *m_linkedAssetState; /** * The endpoint to which we are sending data diff --git a/C/plugins/north/OMF/linkdata.cpp b/C/plugins/north/OMF/linkdata.cpp index edc4771389..5e1a83cc85 100644 --- a/C/plugins/north/OMF/linkdata.cpp +++ b/C/plugins/north/OMF/linkdata.cpp @@ -76,6 +76,8 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi string assetName = reading.getAssetName(); + string originalAssetName = OMF::ApplyPIServerNamingRulesObj(assetName, NULL); + // Apply any TagName hints to modify the containerid if (hints) { @@ -109,7 +111,14 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi assetName = OMF::ApplyPIServerNamingRulesObj(assetName, NULL); bool needDelim = false; - if (m_assetSent->count(assetName) == 0) + auto assetLookup = m_linkedAssetState->find(originalAssetName + "."); + if (assetLookup == m_linkedAssetState->end()) + { + // Panic Asset lookup not created + Logger::getLogger()->error("Internal error: No asset lookup item for %s.", assetName.c_str()); + return ""; + } + if (m_sendFullStructure && assetLookup->second.assetState(assetName) == false) { // Send the data message to create the asset instance outData.append("{ \"typeid\":\"FledgeAsset\", \"values\":[ { \"AssetId\":\""); @@ -117,7 +126,7 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi outData.append(assetName + "\""); outData.append("} ] }"); needDelim = true; - m_assetSent->insert(pair(assetName, true)); + assetLookup->second.assetSent(assetName); } /** @@ -176,28 +185,35 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi // Create the link for the asset if not already created string link = assetName + "." + dpName; + string dpLookupName = originalAssetName + "." + dpName; + auto dpLookup = m_linkedAssetState->find(dpLookupName); + string baseType = getBaseType(dp, format); - auto container = m_containerSent->find(link); - if (container == m_containerSent->end()) + if (dpLookup == m_linkedAssetState->end()) + { + Logger::getLogger()->error("Trying to send a link for a datapoint for which we have not created a base type"); + } + else if (dpLookup->second.containerState(assetName) == false) { sendContainer(link, dp, hints, baseType); - m_containerSent->insert(pair(link, baseType)); + dpLookup->second.containerSent(assetName, baseType); } - else if (baseType.compare(container->second) != 0) + else if (baseType.compare(dpLookup->second.getBaseTypeString()) != 0) { - if (container->second.compare(0, 6, "Double") == 0 && + string bt = dpLookup->second.getBaseTypeString(); + if (bt.compare(0, 6, "Double") == 0 && (baseType.compare(0, 7, "Integer") == 0 || baseType.compare(0, 8, "UInteger") == 0)) { string msg = "Asset " + assetName + " data point " + dpName + " conversion from floating point to integer is being ignored"; OMF::reportAsset(assetName, "warn", msg); - baseType = container->second; + baseType = bt; } else { sendContainer(link, dp, hints, baseType); - (*m_containerSent)[link] = baseType; + dpLookup->second.containerSent(assetName, baseType); } } if (baseType.empty()) @@ -206,7 +222,7 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi skippedDatapoints.push_back(dpName); continue; } - if (m_linkSent->find(link) == m_linkSent->end()) + if (m_sendFullStructure && dpLookup->second.linkState(assetName) == false) { outData.append("{ \"typeid\":\"__Link\","); outData.append("\"values\":[ { \"source\" : {"); @@ -216,8 +232,7 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi outData.append("\"containerid\" : \""); outData.append(link); outData.append("\" } } ] },"); - - m_linkSent->insert(pair(link, true)); + dpLookup->second.linkSent(assetName); } // Convert reading data into the OMF JSON string @@ -256,6 +271,54 @@ string OMFLinkedData::processReading(const Reading& reading, const string& AFHi return outData; } +/** + * If the entries are needed in the lookup table for this bblock of readings then create them + * + * @param readings A block of readings to process + */ +void OMFLinkedData::buildLookup(const vector& readings) +{ + + for (const Reading *reading : readings) + { + string assetName = reading->getAssetName(); + assetName = OMF::ApplyPIServerNamingRulesObj(assetName, NULL); + + // Apply any TagName hints to modify the containerid + LALookup empty; + + string assetKey = assetName + "."; + if (m_linkedAssetState->count(assetKey) == 0) + m_linkedAssetState->insert(pair(assetKey, empty)); + + // Get reading data + const vector data = reading->getReadingData(); + + /** + * This loop creates the data values for each of the datapoints in the + * reading. + */ + for (vector::const_iterator it = data.begin(); it != data.end(); ++it) + { + Datapoint *dp = *it; + string dpName = dp->getName(); + if (dpName.compare(OMF_HINT) == 0) + { + // Don't send the OMF Hint to the PI Server + continue; + } + dpName = OMF::ApplyPIServerNamingRulesObj(dpName, NULL); + if (!isTypeSupported(dp->getData())) + { + continue; + } + string link = assetName + "." + dpName; + if (m_linkedAssetState->count(link) == 0) + m_linkedAssetState->insert(pair(link, empty)); + } + } +} + /** * Calculate the base type we need to link the container * @@ -374,7 +437,8 @@ void OMFLinkedData::sendContainer(string& linkName, Datapoint *dp, OMFHints * hi container += "\", \"typeid\" : \""; container += baseType; container += "\", \"name\" : \""; - container += dp->getName(); + string dpName = OMF::ApplyPIServerNamingRulesObj(dp->getName(), NULL); + container += dpName; container += "\", \"datasource\" : \"" + dataSource + "\""; if (propertyOverrides) @@ -502,3 +566,100 @@ bool OMFLinkedData::flushContainers(HttpSender& sender, const string& path, vect } return true; } + +/** + * Set the base type by passing the string of the base type + */ +void LALookup::setBaseType(const string& baseType) +{ + if (baseType.compare("Double64") == 0) + m_baseType = OMFBT_DOUBLE64; + else if (baseType.compare("Double32") == 0) + m_baseType = OMFBT_DOUBLE32; + else if (baseType.compare("Integer16") == 0) + m_baseType = OMFBT_INTEGER16; + else if (baseType.compare("Integer32") == 0) + m_baseType = OMFBT_INTEGER32; + else if (baseType.compare("Integer64") == 0) + m_baseType = OMFBT_INTEGER64; + else if (baseType.compare("UInteger16") == 0) + m_baseType = OMFBT_UINTEGER16; + else if (baseType.compare("UInteger32") == 0) + m_baseType = OMFBT_UINTEGER32; + else if (baseType.compare("UInteger64") == 0) + m_baseType = OMFBT_UINTEGER64; + else if (baseType.compare("String") == 0) + m_baseType = OMFBT_STRING; + else if (baseType.compare("FledgeAsset") == 0) + m_baseType = OMFBT_FLEDGEASSET; + else + Logger::getLogger()->fatal("Unable to map base type '%s'", baseType.c_str()); +} + +/** + * The container has been sent with the specific base type + * + * @param tagName The name of the tag we are using + * @param baseType The baseType we resolve to + */ +void LALookup::containerSent(const std::string& tagName, OMFBaseType baseType) +{ + if (m_tagName.compare(tagName)) + { + // Force a new Link and AF Link to be sent for the new tag name + m_sentState &= ~(LAL_LINK_SENT | LAL_AFLINK_SENT); + } + m_baseType = baseType; + m_tagName = tagName; + m_sentState |= LAL_CONTAINER_SENT; +} + +/** + * The container has been sent with the specific base type + * + * @param tagName The name of the tag we are using + * @param baseType The baseType we resolve to + */ +void LALookup::containerSent(const std::string& tagName, const std::string& baseType) +{ + setBaseType(baseType); + if (m_tagName.compare(tagName)) + { + // Force a new Link and AF Link to be sent for the new tag name + m_sentState &= ~(LAL_LINK_SENT | LAL_AFLINK_SENT); + } + m_tagName = tagName; + m_sentState |= LAL_CONTAINER_SENT; +} + +/** + * Get a string representation of the base type that was sent + */ +string LALookup::getBaseTypeString() +{ + switch (m_baseType) + { + case OMFBT_UNKNOWN: + return "Unknown"; + case OMFBT_DOUBLE64: + return "Double64"; + case OMFBT_DOUBLE32: + return "Double32"; + case OMFBT_INTEGER16: + return "Integer16"; + case OMFBT_INTEGER32: + return "Integer32"; + case OMFBT_INTEGER64: + return "Integer64"; + case OMFBT_UINTEGER16: + return "UInteger16"; + case OMFBT_UINTEGER32: + return "UInteger32"; + case OMFBT_UINTEGER64: + return "UInteger64"; + case OMFBT_STRING: + return "String"; + default: + return "Unknown"; + } +} diff --git a/C/plugins/north/OMF/omf.cpp b/C/plugins/north/OMF/omf.cpp old mode 100644 new mode 100755 index c4f0e4cf06..f7ddc040be --- a/C/plugins/north/OMF/omf.cpp +++ b/C/plugins/north/OMF/omf.cpp @@ -244,6 +244,7 @@ OMF::OMF(const string& name, m_changeTypeId = false; m_OMFDataTypes = NULL; m_OMFVersion = "1.0"; + m_connected = false; } /** @@ -271,6 +272,7 @@ OMF::OMF(const string& name, m_lastError = false; m_changeTypeId = false; + m_connected = false; } // Destructor @@ -1116,9 +1118,8 @@ uint32_t OMF::sendToServer(const vector& readings, m_baseTypesSent = true; } } - // TODO We do not need the superset stuff if we are using linked data types, - // this would save us interating over the dat aan extra time and reduce our + // this would save us iterating over the data an extra time and reduce our // memory footprint // // Create a superset of all the datapoints for each assetName @@ -1161,9 +1162,13 @@ uint32_t OMF::sendToServer(const vector& readings, bool legacyType = m_legacy; // Create the class that deals with the linked data generation - OMFLinkedData linkedData(&m_containerSent, &m_assetSent, &m_linkSent, m_PIServerEndpoint); + OMFLinkedData linkedData(&m_linkedAssetState, m_PIServerEndpoint); + linkedData.setSendFullStructure(m_sendFullStructure); linkedData.setFormats(getFormatType(OMF_TYPE_FLOAT), getFormatType(OMF_TYPE_INTEGER)); + // Create the lookup data for this block of readings + linkedData.buildLookup(readings); + bool pendingSeparator = false; ostringstream jsonData; jsonData << "["; @@ -1399,10 +1404,10 @@ uint32_t OMF::sendToServer(const vector& readings, { // We do this before the send so we know if it was sent for the first time // in the processReading call - auto asset_sent = m_assetSent.find(m_assetName); + auto lookup = m_linkedAssetState.find(m_assetName + "."); // Send data for this reading using the new mechanism outData = linkedData.processReading(*reading, AFHierarchyPrefix, hints); - if (m_sendFullStructure && asset_sent == m_assetSent.end()) + if (m_sendFullStructure && lookup->second.afLinkState() == false) { // If the hierarchy has not already been sent then send it if (! AFHierarchySent) @@ -1421,6 +1426,7 @@ uint32_t OMF::sendToServer(const vector& readings, outData.append(","); outData.append(af); } + lookup->second.afLinkSent(); } } if (!outData.empty()) @@ -2296,6 +2302,7 @@ std::string OMF::createLinkData(const Reading& reading, std::string& AFHierarch long typeId = getAssetTypeId(assetName); + string lData = "{\"typeid\": \"__Link\", \"values\": ["; // Handles the structure for the Connector Relay @@ -2348,8 +2355,31 @@ std::string OMF::createLinkData(const Reading& reading, std::string& AFHierarch } else { + // Get the new asset name after hints are applied for the linked data messages + string newAssetName = assetName; + if (hints) + { + const std::vector omfHints = hints->getHints(); + for (auto it = omfHints.cbegin(); it != omfHints.cend(); it++) + { + if (typeid(**it) == typeid(OMFTagNameHint)) + { + string hintValue = (*it)->getHint(); + Logger::getLogger()->info("Using OMF TagName hint: %s for asset %s", + hintValue.c_str(), assetName.c_str()); + newAssetName = hintValue; + } + if (typeid(**it) == typeid(OMFTagHint)) + { + string hintValue = (*it)->getHint(); + Logger::getLogger()->info("Using OMF Tag hint: %s for asset %s", + hintValue.c_str(), assetName.c_str()); + newAssetName = hintValue; + } + } + } StringReplace(tmpStr, "_placeholder_tgt_type_", "FledgeAsset"); - StringReplace(tmpStr, "_placeholder_tgt_idx_", assetName); + StringReplace(tmpStr, "_placeholder_tgt_idx_", newAssetName); } lData.append(tmpStr); @@ -3031,7 +3061,15 @@ bool OMF::evaluateAFHierarchyRules(const string& assetName, const Reading& readi generateAFHierarchyPrefixLevel(m_DefaultAFLocation, prefix, AFHierarchyLevel); auto item = make_pair(m_DefaultAFLocation, prefix); - m_AssetNamePrefix[assetName].push_back(item); + auto & curr_vec = m_AssetNamePrefix[assetName]; + + // Insert new item into m_AssetNamePrefix[assetName] vector, if it doesn't exists already + if (std::find(curr_vec.begin(), curr_vec.end(), item) == curr_vec.end()) + { + m_AssetNamePrefix[assetName].push_back(item); + Logger::getLogger()->debug("m_AssetNamePrefix.size()=%d; m_AssetNamePrefix[assetName].size()=%d, added m_AssetNamePrefix[%s]=(%s,%s)", + m_AssetNamePrefix.size(), m_AssetNamePrefix[assetName].size(), assetName.c_str(), m_DefaultAFLocation.c_str(), prefix.c_str()); + } } return success; @@ -4595,7 +4633,10 @@ std::string OMF::ApplyPIServerNamingRulesObj(const std::string &objName, bool *c nameFixed = StringTrim(objName); - Logger::getLogger()->debug("%s - original :%s: trimmed :%s:", __FUNCTION__, objName.c_str(), nameFixed.c_str()); + if (objName.compare(nameFixed) != 0) + { + Logger::getLogger()->debug("%s - original :%s: trimmed :%s:", __FUNCTION__, objName.c_str(), nameFixed.c_str()); + } if (nameFixed.empty ()) { diff --git a/C/plugins/north/OMF/omfinfo.cpp b/C/plugins/north/OMF/omfinfo.cpp index f9efe8b01f..ed3c5b7b98 100644 --- a/C/plugins/north/OMF/omfinfo.cpp +++ b/C/plugins/north/OMF/omfinfo.cpp @@ -159,10 +159,10 @@ OMFInformation::OMFInformation(ConfigCategory *config) : m_sender(NULL), m_omf(N m_AFMap = AFMap; // OCS configurations - OCSNamespace = OCSNamespace; - OCSTenantId = OCSTenantId; - OCSClientId = OCSClientId; - OCSClientSecret = OCSClientSecret; + m_OCSNamespace = OCSNamespace; + m_OCSTenantId = OCSTenantId; + m_OCSClientId = OCSClientId; + m_OCSClientSecret = OCSClientSecret; // PI Web API end-point - evaluates the authentication method requested if (m_PIServerEndpoint == ENDPOINT_PIWEB_API) @@ -461,6 +461,7 @@ uint32_t OMFInformation::send(const vector& readings) { // Created a new sender after a connection failure m_omf->setSender(*m_sender); + m_omf->setConnected(false); } } @@ -536,6 +537,7 @@ uint32_t OMFInformation::send(const vector& readings) Logger::getLogger()->warn("Connection to PI Web API at %s has been lost", m_hostAndPort.c_str()); } m_connected = updatedConnected; + #if INSTRUMENT Logger::getLogger()->debug("plugin_send elapsed time: %6.3f seconds, NumValues: %u", GetElapsedTime(&startTime), ret); diff --git a/C/plugins/north/OMF/plugin.cpp b/C/plugins/north/OMF/plugin.cpp index 6d2d543c71..3708476ebe 100755 --- a/C/plugins/north/OMF/plugin.cpp +++ b/C/plugins/north/OMF/plugin.cpp @@ -107,11 +107,11 @@ const char *PLUGIN_DEFAULT_CONFIG_INFO = QUOTE( "validity" : "PIServerEndpoint == \"AVEVA Data Hub\" || PIServerEndpoint == \"OSIsoft Cloud Services\"" }, "SendFullStructure": { - "description": "It sends the minimum OMF structural messages to load data into Data Archive if disabled", + "description": "If true, create an AF structure to organize the data. If false, create PI Points only.", "type": "boolean", "default": "true", "order": "3", - "displayName": "Send full structure", + "displayName": "Create AF structure", "validity" : "PIServerEndpoint == \"PI Web API\"" }, "NamingScheme": { diff --git a/C/plugins/storage/postgres/connection.cpp b/C/plugins/storage/postgres/connection.cpp index 837fe36b6d..357cb5a008 100644 --- a/C/plugins/storage/postgres/connection.cpp +++ b/C/plugins/storage/postgres/connection.cpp @@ -247,7 +247,9 @@ bool Connection::aggregateQuery(const Value& payload, string& resultSet) // Add where condition sql.append("WHERE "); - if (!jsonWhereClause(payload["where"], sql)) + + vector asset_codes; + if (!jsonWhereClause(payload["where"], sql, asset_codes)) { raiseError("retrieve", "aggregateQuery: failure while building WHERE clause"); return false; @@ -375,11 +377,15 @@ Connection::~Connection() * Perform a query against a common table * */ -bool Connection::retrieve(const string& table, const string& condition, string& resultSet) +bool Connection::retrieve(const string& schema, + const string& table, + const string& condition, + string& resultSet) { Document document; // Default template parameter uses UTF8 and MemoryPoolAllocator. SQLBuffer sql; SQLBuffer jsonConstraints; // Extra constraints to add to where clause +vector asset_codes; try { if (condition.empty()) @@ -408,6 +414,11 @@ SQLBuffer jsonConstraints; // Extra constraints to add to where clause } sql.append(" FROM "); } + else if (document.HasMember("join")) + { + sql.append("SELECT "); + selectColumns(document, sql, 0); + } else if (document.HasMember("return")) { int col = 0; @@ -512,14 +523,70 @@ SQLBuffer jsonConstraints; // Extra constraints to add to where clause } sql.append(" * FROM "); } - sql.append(table); + + if (document.HasMember("join")) + { + sql.append(" FROM "); + sql.append(table); + sql.append(" t0"); + appendTables(schema, document, sql, 1); + } + else + { + sql.append(table); + } if (document.HasMember("where")) { sql.append(" WHERE "); - - if (document.HasMember("where")) + + if (document.HasMember("join")) + { + if (!jsonWhereClause(document["where"], sql, asset_codes, false, "t0.")) + { + return false; + } + + // Now and the join condition itself + string col0, col1; + const Value& join = document["join"]; + if (join.HasMember("on") && join["on"].IsString()) + { + col0 = join["on"].GetString(); + } + else + { + + raiseError("rerieve", "Missing on item"); + return false; + } + if (join.HasMember("table")) + { + const Value& table = join["table"]; + if (table.HasMember("column") && table["column"].IsString()) + { + col1 = table["column"].GetString(); + } + else + { + raiseError("QueryTable", "Missing column in join table"); + return false; + } + } + sql.append(" AND t0."); + sql.append(col0); + sql.append(" = t1."); + sql.append(col1); + sql.append(" "); + if (join.HasMember("query") && join["query"].IsObject()) + { + sql.append("AND "); + const Value& query = join["query"]; + processJoinQueryWhereClause(query, sql, asset_codes, 1); + } + } + else if (document.HasMember("where")) { - if (!jsonWhereClause(document["where"], sql)) + if (!jsonWhereClause(document["where"], sql, asset_codes)) { return false; } @@ -783,7 +850,8 @@ bool Connection::retrieveReadings(const string& condition, string& resultSet) if (document.HasMember("where")) { - if (!jsonWhereClause(document["where"], sql)) + vector asset_codes; + if (!jsonWhereClause(document["where"], sql, asset_codes)) { return false; } @@ -1279,15 +1347,17 @@ SQLBuffer sql; if ((*iter).HasMember("condition")) { sql.append(" WHERE "); - if (!jsonWhereClause((*iter)["condition"], sql)) + vector asset_codes; + if (!jsonWhereClause((*iter)["condition"], sql, asset_codes)) { return false; } } else if ((*iter).HasMember("where")) { + vector asset_codes; sql.append(" WHERE "); - if (!jsonWhereClause((*iter)["where"], sql)) + if (!jsonWhereClause((*iter)["where"], sql, asset_codes)) { return false; } @@ -1354,7 +1424,8 @@ SQLBuffer sql; { if (document.HasMember("where")) { - if (!jsonWhereClause(document["where"], sql)) + vector asset_codes; + if (!jsonWhereClause(document["where"], sql, asset_codes)) { return -1; } @@ -2843,8 +2914,11 @@ bool Connection::jsonModifiers(const Value& payload, SQLBuffer& sql) */ bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, - const string& prefix) + vector &asset_codes, + bool convertLocaltime, // not in use + const string prefix) { + if (!whereClause.IsObject()) { raiseError("where clause", "The \"where\" property must be a JSON object"); @@ -2867,17 +2941,31 @@ bool Connection::jsonWhereClause(const Value& whereClause, double converted = strtod(whereColumnName.c_str(), &p); if (*p) { - // Quote column name - sql.append("\""); + // Double quote column name + if (prefix.empty()) + { + sql.append("\""); + } + + // Add prefix if (!prefix.empty()) + { sql.append(prefix); - sql.append(whereClause["column"].GetString()); - sql.append("\""); + + } + + sql.append(whereColumnName); + + // Double quote column name + if (prefix.empty()) + { + sql.append("\""); + } } else { - // Use converted numeric value - sql.append(whereClause["column"].GetString()); + // Use numeric value + sql.append(whereColumnName); } sql.append(' '); @@ -2990,8 +3078,18 @@ bool Connection::jsonWhereClause(const Value& whereClause, } else if (whereClause["value"].IsString()) { sql.append('\''); - sql.append(escape(whereClause["value"].GetString())); + string value = whereClause["value"].GetString(); + sql.append(escape(value)); sql.append('\''); + + // Identify a specific operation to restrinct the tables involved + if (whereColumnName.compare("asset_code") == 0) + { + if ( cond.compare("=") == 0) + { + asset_codes.push_back(value); + } + } } } } @@ -2999,15 +3097,17 @@ bool Connection::jsonWhereClause(const Value& whereClause, if (whereClause.HasMember("and")) { sql.append(" AND "); - if (!jsonWhereClause(whereClause["and"], sql)) + vector asset_codes; + if (!jsonWhereClause(whereClause["and"], sql, asset_codes, false, prefix)) { return false; } } if (whereClause.HasMember("or")) { + vector asset_codes; sql.append(" OR "); - if (!jsonWhereClause(whereClause["or"], sql)) + if (!jsonWhereClause(whereClause["or"], sql, asset_codes, false, prefix)) { return false; } @@ -3412,21 +3512,7 @@ SQLBuffer sql; */ bool Connection::isFunction(const char *str) const { -const char *p; - - p = str + strlen(str) - 1; - // A function would have a closing bracket followed pnly by white space at the end - while (p > str && isspace(*p)) - p--; - if (*p != ')') - return false; - - // We found the closing bracket now check for the opening bracket - while (p > str && *p != '(') - p--; - if (*p == '(') - return true; - return false; + return strcmp(str, "now()") == 0; } /** @@ -3566,7 +3652,10 @@ SQLBuffer jsonConstraints; * @param sql The SQLBuffer we are writing * @param level The table number we are processing */ -bool Connection::appendTables(const Value& document, SQLBuffer& sql, int level) +bool Connection::appendTables(const string& schema, + const Value& document, + SQLBuffer& sql, + int level) { string tag = "t" + to_string(level); if (document.HasMember("join")) @@ -3586,14 +3675,17 @@ bool Connection::appendTables(const Value& document, SQLBuffer& sql, int level) raiseError("commonRetrieve", "Joining table name is not a string"); return false; } - sql.append(", fledge."); - sql.append(name.GetString()); - sql.append(" "); - sql.append(tag); + + sql.append(", "); + sql.append(schema); + sql.append('.'); + sql.append(name.GetString()); + sql.append(" "); + sql.append(tag); if (join.HasMember("query")) { const Value& query = join["query"]; - appendTables(query, sql, ++level); + appendTables(schema, query, sql, ++level); } else { @@ -3616,12 +3708,16 @@ bool Connection::appendTables(const Value& document, SQLBuffer& sql, int level) * * @param query The JSON query * @param sql The SQLBuffer we are writing the data to - * @param level The nestign level of the joined table + * @param asset_codes The asset codes + * @param level The nesting level of the joined table */ -bool Connection::processJoinQueryWhereClause(const Value& query, SQLBuffer& sql, int level) +bool Connection::processJoinQueryWhereClause(const Value& query, + SQLBuffer& sql, + std::vector &asset_codes, + int level) { string tag = "t" + to_string(level) + "."; - if (!jsonWhereClause(query["where"], sql, tag)) + if (!jsonWhereClause(query["where"], sql, asset_codes, false, tag)) { return false; } @@ -3664,7 +3760,7 @@ bool Connection::processJoinQueryWhereClause(const Value& query, SQLBuffer& sql, { sql.append(" AND "); const Value& query = join["query"]; - processJoinQueryWhereClause(query, sql, level + 1); + processJoinQueryWhereClause(query, sql, asset_codes, level + 1); } } return true; @@ -4383,9 +4479,18 @@ int Connection::create_schema(const std::string &payload) { auto itr = dbCol->find(v); //Check if the column matches exactly with that present in db , if not same , the reject the request - if ( itr->type != v.type || itr->sz != v.sz || itr->key != v.key ) + // We ignore size for integer columns + if (v.type.compare("integer") == 0) + { + if (itr->type != v.type || itr->key != v.key) + { + raiseError("create_schema", "%s:%d Schema:%s, Service:%s, tableName:%s, altering an existing column %s is not allowed", __FUNCTION__, __LINE__, schema.c_str(), service.c_str(), name.c_str(), v.column.c_str() ); + return -1; + } + } + else if (itr->type != v.type || itr->sz != v.sz || itr->key != v.key) { - raiseError("create_schema", "%s:%d Schema:%s, Service:%s, tableName:%s, altering an existing column %s is not allowed", __FUNCTION__, __LINE__, schema.c_str(), service.c_str(), name.c_str(), v.column.c_str() ); + raiseError("create_schema", "%s:%d Schema:%s, Service:%s, tableName:%s, altering an existing column %s is not allowed", __FUNCTION__, __LINE__, schema.c_str(), service.c_str(), name.c_str(), v.column.c_str() ); return -1; } } diff --git a/C/plugins/storage/postgres/include/connection.h b/C/plugins/storage/postgres/include/connection.h index 1a99ed9b2c..d9ad109258 100644 --- a/C/plugins/storage/postgres/include/connection.h +++ b/C/plugins/storage/postgres/include/connection.h @@ -33,7 +33,8 @@ class Connection { public: Connection(); ~Connection(); - bool retrieve(const std::string& table, const std::string& condition, + bool retrieve(const std::string& schema, + const std::string& table, const std::string& condition, std::string& resultSet); bool retrieveReadings(const std::string& condition, std::string& resultSet); int insert(const std::string& table, const std::string& data); @@ -69,9 +70,15 @@ class Connection { void raiseError(const char *operation, const char *reason,...); PGconn *dbConnection; void mapResultSet(PGresult *res, std::string& resultSet); - bool jsonWhereClause(const rapidjson::Value& whereClause, SQLBuffer&, const std::string& prefix = ""); bool jsonModifiers(const rapidjson::Value&, SQLBuffer&); - bool jsonAggregates(const rapidjson::Value&, const rapidjson::Value&, SQLBuffer&, SQLBuffer&, bool isTableReading = false); + bool jsonAggregates(const rapidjson::Value&, + const rapidjson::Value&, + SQLBuffer&, SQLBuffer&, + bool isTableReading = false); + bool jsonWhereClause(const rapidjson::Value& whereClause, + SQLBuffer&, std::vector &asset_codes, + bool convertLocaltime = false, + std::string prefix = ""); bool returnJson(const rapidjson::Value&, SQLBuffer&, SQLBuffer&); char *trim(char *str); const std::string escape_double_quotes(const std::string&); @@ -80,8 +87,14 @@ class Connection { void logSQL(const char *, const char *); bool isFunction(const char *) const; bool selectColumns(const rapidjson::Value& document, SQLBuffer& sql, int level); - bool appendTables(const rapidjson::Value& document, SQLBuffer& sql, int level); - bool processJoinQueryWhereClause(const rapidjson::Value& query, SQLBuffer& sql, int level); + bool appendTables(const std::string &schema, + const rapidjson::Value& document, + SQLBuffer& sql, + int level); + bool processJoinQueryWhereClause(const rapidjson::Value& query, + SQLBuffer& sql, + std::vector &asset_codes, + int level); std::string getIndexName(std::string s); bool checkValidDataType(const std::string &s); diff --git a/C/plugins/storage/postgres/plugin.cpp b/C/plugins/storage/postgres/plugin.cpp index bdd0902b43..3892a7b44f 100644 --- a/C/plugins/storage/postgres/plugin.cpp +++ b/C/plugins/storage/postgres/plugin.cpp @@ -122,7 +122,7 @@ ConnectionManager *manager = (ConnectionManager *)handle; Connection *connection = manager->allocate(); std::string results; - bool rval = connection->retrieve(std::string(OR_DEFAULT_SCHEMA(schema)) + "." + std::string(table), std::string(query), results); + bool rval = connection->retrieve(schema, std::string(OR_DEFAULT_SCHEMA(schema)) + "." + std::string(table), std::string(query), results); manager->release(connection); if (rval) { diff --git a/C/plugins/storage/sqlite/common/connection.cpp b/C/plugins/storage/sqlite/common/connection.cpp index 989e607c31..1a989c864b 100644 --- a/C/plugins/storage/sqlite/common/connection.cpp +++ b/C/plugins/storage/sqlite/common/connection.cpp @@ -26,7 +26,7 @@ #define PURGE_SLOWDOWN_AFTER_BLOCKS 5 #define PURGE_SLOWDOWN_SLEEP_MS 500 -#define LOG_AFTER_NERRORS 5 +#define LOG_AFTER_NERRORS (MAX_RETRIES / 2) /** * SQLite3 storage plugin for Fledge @@ -3133,8 +3133,6 @@ int retries = 0, rc; retries++; if (rc != SQLITE_OK) { - if (retries > LOG_AFTER_NERRORS) - Logger::getLogger()->warn("Connection::SQLexec - retry :%d: dbHandle :%X: cmd :%s: error :%s:", retries, this->getDbHandle(), sql, sqlite3_errmsg(dbHandle)); #if DO_PROFILE_RETRIES @@ -3146,8 +3144,6 @@ int retries = 0, rc; #endif int interval = (1 * RETRY_BACKOFF); std::this_thread::sleep_for(std::chrono::milliseconds(interval)); - if (retries > 9) Logger::getLogger()->info("SQLExec: error :%s: retry %d of %d, rc=%s, errmsg=%s, DB connection @ %p, slept for %d msecs", - sqlite3_errmsg(dbHandle), retries, MAX_RETRIES, (rc==SQLITE_LOCKED)?"SQLITE_LOCKED":"SQLITE_BUSY", sqlite3_errmsg(db), this, interval); #if DO_PROFILE_RETRIES m_qMutex.lock(); m_waiting.fetch_sub(1); @@ -3171,6 +3167,15 @@ int retries = 0, rc; } } while (retries < MAX_RETRIES && (rc != SQLITE_OK)); + if (retries >= MAX_RETRIES) + { + Logger::getLogger()->error("SQL statement %s failed after maximum retries", sql, sqlite3_errmsg(dbHandle)); + } + else if (retries > LOG_AFTER_NERRORS) + { + Logger::getLogger()->warn("%d retries required of the SQL statement '%s': %s", retries, sql, sqlite3_errmsg(dbHandle)); + Logger::getLogger()->warn("If the excessive retries continue for sustained periods it is a sign that the system may be reaching the limits of the load it can handle"); + } #if DO_PROFILE_RETRIES retryStats[retries-1]++; if (++numStatements > RETRY_REPORT_THRESHOLD - 1) @@ -3240,14 +3245,17 @@ int retries = 0, rc; int interval = (retries * RETRY_BACKOFF); this_thread::sleep_for(chrono::milliseconds(interval)); - - if (retries > 5) { - Logger::getLogger()->debug("SQLStep: retry %d of %d, rc=%s, DB connection @ %p, slept for %d msecs", - retries, MAX_RETRIES, (rc==SQLITE_LOCKED)?"SQLITE_LOCKED":"SQLITE_BUSY", this, interval); - - } } } while (retries < MAX_RETRIES && (rc == SQLITE_LOCKED || rc == SQLITE_BUSY)); + if (retries >= MAX_RETRIES) + { + Logger::getLogger()->error("SQL statement failed after maximum retries", sqlite3_errmsg(dbHandle)); + } + else if (retries > LOG_AFTER_NERRORS) + { + Logger::getLogger()->warn("%d retries required of the SQL statement: %s", retries, sqlite3_errmsg(dbHandle)); + Logger::getLogger()->warn("If the excessive retries continue for sustained periods it is a sign that the system may be reaching the limits of the load it can handle"); + } #if DO_PROFILE_RETRIES retryStats[retries-1]++; if (++numStatements > 1000) diff --git a/C/plugins/storage/sqlite/common/include/readings_catalogue.h b/C/plugins/storage/sqlite/common/include/readings_catalogue.h index 4562bb066f..9171cb9b3a 100644 --- a/C/plugins/storage/sqlite/common/include/readings_catalogue.h +++ b/C/plugins/storage/sqlite/common/include/readings_catalogue.h @@ -51,6 +51,37 @@ typedef struct } STORAGE_CONFIGURATION; +/** + * Class used to store table references + */ +class TableReference { + public: + TableReference(int dbId, int tableId) : m_dbId(dbId), m_tableId(tableId) + { + m_issued = time(0); + }; + time_t lastIssued() + { + return m_issued; + }; + int getTable() + { + return m_tableId; + }; + int getDatabase() + { + return m_dbId; + }; + void issue() + { + m_issued = time(0); + }; + private: + int m_dbId; + int m_tableId; + time_t m_issued; +}; + /** * Implements the handling of multiples readings tables stored among multiple SQLite databases. * @@ -228,7 +259,7 @@ class ReadingsCatalogue { m_ReadingsGlobalId; // Global row id shared among all the readings table int m_nReadingsAvailable = 0; // Number of readings tables available - std::map > m_AssetReadingCatalogue={ // In memory structure to identify in which database/table an asset is stored + std::map m_AssetReadingCatalogue={ // In memory structure to identify in which database/table an asset is stored // asset_code - reading Table Id, Db Id // {"", ,{1 ,1 }} diff --git a/C/plugins/storage/sqlite/common/readings_catalogue.cpp b/C/plugins/storage/sqlite/common/readings_catalogue.cpp index 5427a27e4f..001de85627 100644 --- a/C/plugins/storage/sqlite/common/readings_catalogue.cpp +++ b/C/plugins/storage/sqlite/common/readings_catalogue.cpp @@ -278,15 +278,15 @@ int ReadingsCatalogue::calculateGlobalId (sqlite3 *dbHandle) { for (auto &item : m_AssetReadingCatalogue) { - if (item.second.first != 0) + if (item.second.getTable() != 0) { if (!firstRow) { sql_cmd += " UNION "; } - dbName = generateDbName(item.second.second); - dbReadingsName = generateReadingsName(item.second.second, item.second.first); + dbName = generateDbName(item.second.getDatabase()); + dbReadingsName = generateReadingsName(item.second.getDatabase(), item.second.getTable()); sql_cmd += " SELECT max(id) id FROM " + dbName + "." + dbReadingsName + " "; firstRow = false; @@ -372,15 +372,15 @@ int ReadingsCatalogue::getMinGlobalId (sqlite3 *dbHandle) { for (auto &item : m_AssetReadingCatalogue) { - if (item.second.first != 0) + if (item.second.getTable() != 0) { if (!firstRow) { sql_cmd += " UNION "; } - dbName = generateDbName(item.second.second); - dbReadingsName = generateReadingsName(item.second.second, item.second.first); + dbName = generateDbName(item.second.getDatabase()); + dbReadingsName = generateReadingsName(item.second.getDatabase(), item.second.getTable()); sql_cmd += " SELECT min(id) id FROM " + dbName + "." + dbReadingsName + " "; firstRow = false; @@ -483,8 +483,7 @@ bool ReadingsCatalogue::loadAssetReadingCatalogue() Logger::getLogger()->debug("loadAssetReadingCatalogue - thread '%s' reading Id %d dbId %d asset name '%s' max db Id %d", threadId.str().c_str(), tableId, dbId, asset_name, maxDbID); - auto newItem = make_pair(tableId,dbId); - auto newMapValue = make_pair(asset_name,newItem); + auto newMapValue = make_pair(asset_name,TableReference(dbId, tableId)); m_AssetReadingCatalogue.insert(newMapValue); if (tableId == 0 && dbId > m_maxOverflowUsed) // Overflow { @@ -609,7 +608,7 @@ void ReadingsCatalogue::getAllDbs(vector &dbIdList) for (auto &item : m_AssetReadingCatalogue) { - dbId = item.second.second; + dbId = item.second.getDatabase(); if (dbId > 1) { if (std::find(dbIdList.begin(), dbIdList.end(), dbId) == dbIdList.end() ) @@ -720,12 +719,7 @@ char *zErrMsg = NULL; // See if the overflow table exists and if not create it // This is a workaround as the schema update mechanism can't cope // with multiple readings tables - sqlCmd = "select count(*) from " + alias + ".readings_overflow;"; - rc = SQLExec(dbHandle, sqlCmd.c_str(), &zErrMsg); - if (rc != SQLITE_OK) - { - createReadingsOverflowTable(dbHandle, id); - } + createReadingsOverflowTable(dbHandle, id); return result; } @@ -1404,8 +1398,8 @@ int ReadingsCatalogue::calcMaxReadingUsed() for (auto &item : m_AssetReadingCatalogue) { - if (item.second.first > maxReading) - maxReading = item.second.first; + if (item.second.getTable() > maxReading) + maxReading = item.second.getTable(); } return (maxReading); @@ -1715,7 +1709,7 @@ bool ReadingsCatalogue::createNewDB(sqlite3 *dbHandle, int newDbId, int startId m_nReadingsAvailable = readingsToAllocate; } - // Create the overflow table in the new database + // Create the overflow table in the new database if it was not previosuly created createReadingsOverflowTable(dbHandle, newDbId); if (attachAllDb == NEW_DB_DETACH) @@ -1847,6 +1841,15 @@ bool ReadingsCatalogue::createReadingsOverflowTable(sqlite3 *dbHandle, int dbId dbReadingsName = string(READINGS_TABLE) + "_" + to_string(dbId); dbReadingsName.append("_overflow"); + string sqlCmd = "select count(*) from " + dbName + "." + dbReadingsName + ";"; + char *errMsg; + int rc = SQLExec(dbHandle, sqlCmd.c_str(), &errMsg); + if (rc == SQLITE_OK) + { + logger->debug("Overflow table %s already exists, not attempting creation", dbReadingsName.c_str()); + return true; + } + string createReadings = R"( CREATE TABLE IF NOT EXISTS )" + dbName + "." + dbReadingsName + R"( ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -1869,7 +1872,7 @@ bool ReadingsCatalogue::createReadingsOverflowTable(sqlite3 *dbHandle, int dbId logger->info(" Creating table '%s' sql cmd '%s'", dbReadingsName.c_str(), createReadings.c_str()); - int rc = SQLExec(dbHandle, createReadings.c_str()); + rc = SQLExec(dbHandle, createReadings.c_str()); if (rc != SQLITE_OK) { raiseError("creating overflow table", sqlite3_errmsg(dbHandle)); @@ -2040,8 +2043,9 @@ ReadingsCatalogue::tyReadingReference ReadingsCatalogue::getReadingReference(Co if (item != m_AssetReadingCatalogue.end()) { //# The asset is already allocated to a table - ref.tableId = item->second.first; - ref.dbId = item->second.second; + ref.tableId = item->second.getTable(); + ref.dbId = item->second.getDatabase(); + item->second.issue(); } else { @@ -2055,8 +2059,9 @@ ReadingsCatalogue::tyReadingReference ReadingsCatalogue::getReadingReference(Co auto item = m_AssetReadingCatalogue.find(asset_code); if (item != m_AssetReadingCatalogue.end()) { - ref.tableId = item->second.first; - ref.dbId = item->second.second; + ref.tableId = item->second.getTable(); + ref.dbId = item->second.getDatabase(); + item->second.issue(); } else { @@ -2138,8 +2143,7 @@ ReadingsCatalogue::tyReadingReference ReadingsCatalogue::getReadingReference(Co { m_EmptyAssetReadingCatalogue.erase(emptyAsset); m_AssetReadingCatalogue.erase(emptyAsset); - auto newItem = make_pair(ref.tableId, ref.dbId); - auto newMapValue = make_pair(asset_code, newItem); + auto newMapValue = make_pair(asset_code, TableReference(ref.dbId, ref.tableId)); m_AssetReadingCatalogue.insert(newMapValue); } @@ -2183,8 +2187,7 @@ ReadingsCatalogue::tyReadingReference ReadingsCatalogue::getReadingReference(Co { // Assign to overflow Logger::getLogger()->info("Assign asset %s to the overflow table", asset_code); - auto newItem = make_pair(0, m_nextOverflow); - auto newMapValue = make_pair(asset_code, newItem); + auto newMapValue = make_pair(asset_code, TableReference(m_nextOverflow, 0)); m_AssetReadingCatalogue.insert(newMapValue); sql_cmd = "INSERT INTO " READINGS_DB ".asset_reading_catalogue (table_id, db_id, asset_code) VALUES ( 0," @@ -2207,6 +2210,7 @@ ReadingsCatalogue::tyReadingReference ReadingsCatalogue::getReadingReference(Co m_nextOverflow = 1; } + Logger::getLogger()->debug("Assign: '%s' to %d, %d", asset_code, ref.dbId, ref.tableId); } attachSync->unlock(); } @@ -2244,13 +2248,14 @@ bool ReadingsCatalogue::loadEmptyAssetReadingCatalogue(bool clean) connection->setUsage(usage); #endif dbHandle = connection->getDbHandle(); + time_t issueThreshold = time(0) - 600; // More than 10 minutes since it was last ussed for (auto &item : m_AssetReadingCatalogue) { string asset_name = item.first; // Asset - int tableId = item.second.first; // tableId; - int dbId = item.second.second; // dbId; + int tableId = item.second.getTable(); // tableId; + int dbId = item.second.getDatabase(); // dbId; - if (tableId > 0) + if (tableId > 0 && item.second.lastIssued() < issueThreshold) { sql_cmd = "SELECT COUNT(*) FROM readings_" + to_string(dbId) + ".readings_" + to_string(dbId) + "_" + to_string(tableId) + " ;"; @@ -2304,7 +2309,7 @@ ReadingsCatalogue::tyReadingReference ReadingsCatalogue::getEmptyReadingTableRef } /** - * Retrieve the maximum readings id for the provided database id + * Retrieve the maximum table id for the provided database id * * @param dbId Database id for which the maximum reading id must be retrieved * @return Maximum readings for the requested database id @@ -2316,8 +2321,8 @@ int ReadingsCatalogue::getMaxReadingsId(int dbId) for (auto &item : m_AssetReadingCatalogue) { - if (item.second.second == dbId && item.second.first > maxId) - maxId = item.second.first; + if (item.second.getDatabase() == dbId && item.second.getTable() > maxId) + maxId = item.second.getTable(); } return (maxId); @@ -2372,7 +2377,7 @@ int ReadingsCatalogue::getUsedTablesDbId(int dbId) for (auto &item : m_AssetReadingCatalogue) { - if (item.second.first != 0 && item.second.second == dbId) + if (item.second.getTable() != 0 && item.second.getDatabase() == dbId) count++; } @@ -2423,8 +2428,8 @@ int ReadingsCatalogue::purgeAllReadings(sqlite3 *dbHandle, const char *sqlCmdBa } sqlCmdTmp = sqlCmdBase; - dbName = generateDbName(item.second.second); - dbReadingsName = generateReadingsName(item.second.second, item.second.first); + dbName = generateDbName(item.second.getDatabase()); + dbReadingsName = generateReadingsName(item.second.getDatabase(), item.second.getTable()); StringReplaceAll (sqlCmdTmp, "_assetcode_", item.first); StringReplaceAll (sqlCmdTmp, "_dbname_", dbName); @@ -2510,7 +2515,7 @@ string ReadingsCatalogue::sqlConstructMultiDb(string &sqlCmdBase, vector asset_codes; if (!m_schemaManager->exists(dbHandle, schema)) { @@ -1064,7 +1065,14 @@ SQLBuffer jsonConstraints; { return false; } - sql.append(" FROM fledge."); + sql.append(" FROM "); + sql.append(schema); + sql.append('.'); + } + else if (document.HasMember("join")) + { + sql.append("SELECT "); + selectColumns(document, sql, 0); } else if (document.HasMember("return")) { @@ -1167,7 +1175,9 @@ SQLBuffer jsonConstraints; } col++; } - sql.append(" FROM fledge."); + sql.append(" FROM "); + sql.append(schema); + sql.append('.'); } else { @@ -1177,17 +1187,76 @@ SQLBuffer jsonConstraints; sql.append(document["modifier"].GetString()); sql.append(' '); } - sql.append(" * FROM fledge."); + sql.append(" * FROM "); + sql.append(schema); + sql.append('.'); + } + if (document.HasMember("join")) + { + sql.append(" FROM "); + sql.append(schema); + sql.append('.'); + sql.append(table); + sql.append(" t0"); + appendTables(schema, document, sql, 1); + } + else + { + sql.append(table); } - sql.append(table); if (document.HasMember("where")) { sql.append(" WHERE "); - - if (document.HasMember("where")) + + if (document.HasMember("join")) + { + if (!jsonWhereClause(document["where"], sql, asset_codes, false, "t0.")) + { + return false; + } + + // Now and the join condition itself + string col0, col1; + const Value& join = document["join"]; + if (join.HasMember("on") && join["on"].IsString()) + { + col0 = join["on"].GetString(); + } + else + { + raiseError("rerieve", "Missing on item"); + return false; + } + if (join.HasMember("table")) + { + const Value& table = join["table"]; + if (table.HasMember("column") && table["column"].IsString()) + { + col1 = table["column"].GetString(); + } + else + { + raiseError("QueryTable", "Missing column in join table"); + return false; + } + } + sql.append(" AND t0."); + sql.append(col0); + sql.append(" = t1."); + sql.append(col1); + sql.append(" "); + if (join.HasMember("query") && join["query"].IsObject()) + { + sql.append("AND "); + const Value& query = join["query"]; + processJoinQueryWhereClause(query, sql, asset_codes, 1); + } + } + else if (document.HasMember("where")) { - if (!jsonWhereClause(document["where"], sql, true)) + if (!jsonWhereClause(document["where"], sql, asset_codes, false)) { + raiseError("retrieve", "Failed to add where clause"); return false; } } @@ -1485,6 +1554,7 @@ int Connection::update(const string& schema, Document document; SQLBuffer sql; bool allowZero = false; +vector asset_codes; int row = 0; ostringstream convert; @@ -1806,7 +1876,7 @@ bool allowZero = false; if ((*iter).HasMember("condition")) { sql.append(" WHERE "); - if (!jsonWhereClause((*iter)["condition"], sql)) + if (!jsonWhereClause((*iter)["condition"], sql, asset_codes)) { return false; } @@ -1814,7 +1884,7 @@ bool allowZero = false; else if ((*iter).HasMember("where")) { sql.append(" WHERE "); - if (!jsonWhereClause((*iter)["where"], sql)) + if (!jsonWhereClause((*iter)["where"], sql, asset_codes)) { return false; } @@ -2720,7 +2790,9 @@ bool Connection::jsonModifiers(const Value& payload, */ bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, - bool convertLocaltime) + std::vector &asset_codes, + bool convertLocaltime, + string prefix) { if (!whereClause.IsObject()) { @@ -2738,7 +2810,12 @@ bool Connection::jsonWhereClause(const Value& whereClause, return false; } - sql.append(whereClause["column"].GetString()); + string column = whereClause["column"].GetString(); + if (!prefix.empty()) + { + sql.append(prefix); + } + sql.append(column); sql.append(' '); string cond = whereClause["condition"].GetString(); @@ -2858,9 +2935,15 @@ bool Connection::jsonWhereClause(const Value& whereClause, sql.append(whereClause["value"].GetInt()); } else if (whereClause["value"].IsString()) { + string value = whereClause["value"].GetString(); sql.append('\''); - sql.append(escape(whereClause["value"].GetString())); + sql.append(escape(value)); sql.append('\''); + + // Identify a specific operation to restrinct the tables involved + if (column.compare("asset_code") == 0) + if ( cond.compare("=") == 0) + asset_codes.push_back(value); } } } @@ -2868,7 +2951,7 @@ bool Connection::jsonWhereClause(const Value& whereClause, if (whereClause.HasMember("and")) { sql.append(" AND "); - if (!jsonWhereClause(whereClause["and"], sql, convertLocaltime)) + if (!jsonWhereClause(whereClause["and"], sql, asset_codes, convertLocaltime, prefix)) { return false; } @@ -2876,7 +2959,7 @@ bool Connection::jsonWhereClause(const Value& whereClause, if (whereClause.HasMember("or")) { sql.append(" OR "); - if (!jsonWhereClause(whereClause["or"], sql, convertLocaltime)) + if (!jsonWhereClause(whereClause["or"], sql, asset_codes, convertLocaltime, prefix)) { return false; } @@ -3258,6 +3341,7 @@ int Connection::deleteRows(const string& schema, // Default template parameter uses UTF8 and MemoryPoolAllocator. Document document; SQLBuffer sql; +vector asset_codes; if (!m_schemaManager->exists(dbHandle, schema)) { @@ -3285,7 +3369,7 @@ SQLBuffer sql; { if (document.HasMember("where")) { - if (!jsonWhereClause(document["where"], sql)) + if (!jsonWhereClause(document["where"], sql, asset_codes)) { return -1; } @@ -3581,3 +3665,268 @@ string Connection::operation(const char *sql) return string(buf); } + +/** + * In the case of a join add the tables to select from for all the tables in + * the join + * + * @param schema The schema we are using + * @param document The query we are processing + * @param sql The SQLBuffer we are writing + * @param level The table number we are processing + */ +bool Connection::appendTables(const string& schema, const Value& document, SQLBuffer& sql, int level) +{ + string tag = "t" + to_string(level); + if (document.HasMember("join")) + { + const Value& join = document["join"]; + if (join.HasMember("table")) + { + const Value& table = join["table"]; + if (!table.HasMember("name")) + { + raiseError("commonRetrieve", "Joining table is missing a table name"); + return false; + } + const Value& name = table["name"]; + if (!name.IsString()) + { + raiseError("commonRetrieve", "Joining table name is not a string"); + return false; + } + sql.append(", "); + sql.append(schema); + sql.append('.'); + sql.append(name.GetString()); + sql.append(" "); + sql.append(tag); + if (join.HasMember("query")) + { + const Value& query = join["query"]; + appendTables(schema, query, sql, ++level); + } + else + { + raiseError("commonRetrieve", "Join is missing a join query definition"); + return false; + } + } + else + { + raiseError("commonRetrieve", "Join is missing a table definition"); + return false; + } + } + return true; +} + +/** + * Recurse down and add the where cluase and join terms for each + * new table joined to the query + * + * @param query The JSON query + * @param sql The SQLBuffer we are writing the data to + * @param asset_codes The asset codes + * @param level The nestign level of the joined table + */ +bool Connection::processJoinQueryWhereClause(const Value& query, + SQLBuffer& sql, + std::vector &asset_codes, + int level) +{ + string tag = "t" + to_string(level) + "."; + if (!jsonWhereClause(query["where"], sql, asset_codes, true, tag)) + { + return false; + } + + if (query.HasMember("join")) + { + // Now and the join condition itself + string col0, col1; + const Value& join = query["join"]; + if (join.HasMember("on") && join["on"].IsString()) + { + col0 = join["on"].GetString(); + } + else + { + return false; + } + if (join.HasMember("table")) + { + const Value& table = join["table"]; + if (table.HasMember("column") && table["column"].IsString()) + { + col1 = table["column"].GetString(); + } + else + { + raiseError("Joined query", "Missing join column in table"); + return false; + } + } + sql.append(" AND "); + sql.append(tag); + sql.append(col0); + sql.append(" = t"); + sql.append(level + 1); + sql.append("."); + sql.append(col1); + sql.append(" "); + if (join.HasMember("query") && join["query"].IsObject()) + { + sql.append(" AND "); + const Value& query = join["query"]; + processJoinQueryWhereClause(query, sql, asset_codes, level + 1); + } + } + return true; +} + +/** + * In the case of a join add the columns to select from for all the tables in + * the join + * + * @param document The query we are processing + * @param sql The SQLBuffer we are writing + * @param level The table number we are processing + */ +bool Connection::selectColumns(const Value& document, SQLBuffer& sql, int level) +{ +SQLBuffer jsonConstraints; + +string tag = "t" + to_string(level) + "."; + + if (document.HasMember("return")) + { + int col = 0; + const Value& columns = document["return"]; + if (! columns.IsArray()) + { + raiseError("retrieve", "The property return must be an array"); + return false; + } + if (document.HasMember("modifier")) + { + sql.append(document["modifier"].GetString()); + sql.append(' '); + } + for (Value::ConstValueIterator itr = columns.Begin(); itr != columns.End(); ++itr) + { + if (col) + { + sql.append(", "); + } + if (!itr->IsObject()) // Simple column name + { + sql.append(tag); + sql.append(itr->GetString()); + } + else + { + if (itr->HasMember("column")) + { + if (! (*itr)["column"].IsString()) + { + raiseError("rerieve", + "column must be a string"); + return false; + } + if (itr->HasMember("format")) + { + if (! (*itr)["format"].IsString()) + { + raiseError("rerieve", + "format must be a string"); + return false; + } + + // SQLite 3 date format. + string new_format; + applyColumnDateFormat((*itr)["format"].GetString(), + tag + (*itr)["column"].GetString(), + new_format, + true); + + // Add the formatted column or use it as is + sql.append(new_format); + } + else if (itr->HasMember("timezone")) + { + if (! (*itr)["timezone"].IsString()) + { + raiseError("rerieve", + "timezone must be a string"); + return false; + } + // SQLite3 doesnt support time zone formatting + if (strcasecmp((*itr)["timezone"].GetString(), "utc") != 0) + { + raiseError("retrieve", + "SQLite3 plugin does not support timezones in qeueries"); + return false; + } + else + { + sql.append("strftime('" F_DATEH24_MS "', "); + sql.append(tag); + sql.append((*itr)["column"].GetString()); + + sql.append(", 'utc')"); + } + } + else + { + sql.append(tag); + sql.append((*itr)["column"].GetString()); + } + sql.append(' '); + } + else if (itr->HasMember("json")) + { + const Value& json = (*itr)["json"]; + if (! returnJson(json, sql, jsonConstraints)) + { + return false; + } + } + else + { + raiseError("retrieve", + "return object must have either a column or json property"); + return false; + } + + if (itr->HasMember("alias")) + { + sql.append(" AS \""); + sql.append((*itr)["alias"].GetString()); + sql.append('"'); + } + } + col++; + } + } + else + { + sql.append('*'); + return true; + } + if (document.HasMember("join")) + { + const Value& join = document["join"]; + if (join.HasMember("query")) + { + const Value& query = join["query"]; + sql.append(", "); + if (!selectColumns(query, sql, ++level)) + { + raiseError("commonRetrieve", "Join failed to add select columns"); + return false; + } + } + } + return true; +} diff --git a/C/plugins/storage/sqlitelb/common/include/connection.h b/C/plugins/storage/sqlitelb/common/include/connection.h index 703b457b2d..ad1ac00aa4 100644 --- a/C/plugins/storage/sqlitelb/common/include/connection.h +++ b/C/plugins/storage/sqlitelb/common/include/connection.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #ifndef MEMORY_READING_PLUGIN #include @@ -140,7 +141,10 @@ class Connection { void raiseError(const char *operation, const char *reason,...); sqlite3 *dbHandle; int mapResultSet(void *res, std::string& resultSet); - bool jsonWhereClause(const rapidjson::Value& whereClause, SQLBuffer&, bool convertLocaltime = false); + bool jsonWhereClause(const rapidjson::Value& whereClause, + SQLBuffer&, std::vector &asset_codes, + bool convertLocaltime = false, + std::string prefix = ""); bool jsonModifiers(const rapidjson::Value&, SQLBuffer&, bool isTableReading = false); bool jsonAggregates(const rapidjson::Value&, const rapidjson::Value&, @@ -155,5 +159,11 @@ class Connection { int i, std::string& newDate); void logSQL(const char *, const char *); + bool appendTables(const std::string& schema, const rapidjson::Value& document, SQLBuffer& sql, int level); + bool processJoinQueryWhereClause(const rapidjson::Value& query, + SQLBuffer& sql, + std::vector &asset_codes, + int level); + bool selectColumns(const rapidjson::Value& document, SQLBuffer& sql, int level); }; #endif diff --git a/C/plugins/storage/sqlitelb/common/readings.cpp b/C/plugins/storage/sqlitelb/common/readings.cpp index 5620dbbf40..fb6cbad35a 100644 --- a/C/plugins/storage/sqlitelb/common/readings.cpp +++ b/C/plugins/storage/sqlitelb/common/readings.cpp @@ -149,6 +149,8 @@ bool aggregateAll(const Value& payload) */ bool Connection::aggregateQuery(const Value& payload, string& resultSet) { + vector asset_codes; + if (!payload.HasMember("where") || !payload.HasMember("timebucket")) { @@ -300,7 +302,7 @@ bool Connection::aggregateQuery(const Value& payload, string& resultSet) // Add where condition sql.append("WHERE "); - if (!jsonWhereClause(payload["where"], sql)) + if (!jsonWhereClause(payload["where"], sql, asset_codes)) { raiseError("retrieve", "aggregateQuery: failure while building WHERE clause"); return false; @@ -718,6 +720,13 @@ int Connection::readingStream(ReadingStream **readings, bool commit) raiseError("appendReadings","freeing SQLite in memory structure - error :%s:", sqlite3_errmsg(dbHandle)); } } + if(batch_stmt != NULL) + { + if (sqlite3_finalize(batch_stmt) != SQLITE_OK) + { + raiseError("appendReadings","freeing SQLite in memory batch structure - error :%s:", sqlite3_errmsg(dbHandle)); + } + } #if INSTRUMENT gettimeofday(&t2, NULL); @@ -1119,6 +1128,13 @@ int sleep_time_ms = 0; raiseError("appendReadings","freeing SQLite in memory structure - error :%s:", sqlite3_errmsg(dbHandle)); } } + if(batch_stmt != NULL) + { + if (sqlite3_finalize(batch_stmt) != SQLITE_OK) + { + raiseError("appendReadings","freeing SQLite in memory batch structure - error :%s:", sqlite3_errmsg(dbHandle)); + } + } if (readingsCopy) { @@ -1254,6 +1270,7 @@ SQLBuffer sql; SQLBuffer jsonConstraints; bool isAggregate = false; const char *timezone = "utc"; +vector asset_codes; try { if (dbHandle == NULL) @@ -1531,7 +1548,7 @@ const char *timezone = "utc"; if (document.HasMember("where")) { - if (!jsonWhereClause(document["where"], sql)) + if (!jsonWhereClause(document["where"], sql, asset_codes)) { return false; } diff --git a/C/services/common/service_security.cpp b/C/services/common/service_security.cpp index 3504ce7db8..e1c756b92e 100644 --- a/C/services/common/service_security.cpp +++ b/C/services/common/service_security.cpp @@ -608,7 +608,7 @@ void ServiceAuthHandler::refreshBearerToken() // Shutdown service if (m_refreshRunning) { - Logger::getLogger()->warn("Service is being shut down " \ + Logger::getLogger()->warn("Service is being restarted " \ "due to bearer token refresh error"); this->restart(); break; diff --git a/C/services/north/data_load.cpp b/C/services/north/data_load.cpp index 77567dad13..8d721395ef 100755 --- a/C/services/north/data_load.cpp +++ b/C/services/north/data_load.cpp @@ -156,11 +156,10 @@ void DataLoad::triggerRead(unsigned int blockSize) */ void DataLoad::readBlock(unsigned int blockSize) { -ReadingSet *readings = NULL; -int n_waits = 0; - + int n_waits = 0; do { + ReadingSet* readings = nullptr; try { switch (m_dataSource) @@ -178,16 +177,19 @@ int n_waits = 0; default: Logger::getLogger()->fatal("Bad source for data to send"); break; - } } - catch (ReadingSetException* e) + catch (ReadingSetException* e) { // Ignore, the exception has been reported in the layer below + // readings may contain erroneous data, clear it + readings = nullptr; } - catch (exception& e) + catch (exception& e) { // Ignore, the exception has been reported in the layer below + // readings may contain erroneous data, clear it + readings = nullptr; } if (readings && readings->getCount()) { @@ -211,7 +213,7 @@ int n_waits = 0; // Logger::getLogger()->debug("DataLoad::readBlock(): No readings available"); } if (!m_shutdown) - { + { // TODO improve this this_thread::sleep_for(chrono::milliseconds(250)); n_waits++; @@ -604,7 +606,14 @@ void DataLoad::updateStatistic(const string& key, const string& description, uin if (m_storage->insertTable(table, values) != 1) { - Logger::getLogger()->error("Failed to insert a new row into the %s", table.c_str()); + if (m_storage->updateTable("statistics", updateValue, wLastStat) == 1) + { + Logger::getLogger()->warn("Statistics update has suceeded, the above failures are the likely result of a race condition between services and can be ignored"); + } + else + { + Logger::getLogger()->error("Failed to insert a new row into the %s", table.c_str()); + } } else { diff --git a/C/services/north/north.cpp b/C/services/north/north.cpp index e9c81fb15e..d2ed9a792d 100755 --- a/C/services/north/north.cpp +++ b/C/services/north/north.cpp @@ -533,9 +533,17 @@ void NorthService::start(string& coreAddress, unsigned short corePort) if (!m_dryRun) { - // Clean shutdown, unregister the storage service - logger->info("Unregistering service"); - m_mgtClient->unregisterService(); + if (m_requestRestart) + { + // Request core to restart this service + m_mgtClient->restartService(); + } + else + { + // Clean shutdown, unregister the storage service + logger->info("Unregistering service"); + m_mgtClient->unregisterService(); + } } } management.stop(); @@ -716,12 +724,13 @@ void NorthService::shutdown() */ void NorthService::restart() { - /* Stop recieving new requests and allow existing - * requests to drain. - */ + logger->info("North service restart in progress."); + + // Set restart action m_requestRestart = true; + + // Set shutdown action m_shutdown = true; - logger->info("North service shutdown in progress."); // Signal main thread to shutdown m_cv.notify_all(); diff --git a/C/services/south-plugin-interfaces/python/python_plugin_interface.cpp b/C/services/south-plugin-interfaces/python/python_plugin_interface.cpp index 59da71af1e..399372d8a3 100755 --- a/C/services/south-plugin-interfaces/python/python_plugin_interface.cpp +++ b/C/services/south-plugin-interfaces/python/python_plugin_interface.cpp @@ -36,7 +36,7 @@ std::vector* plugin_poll_fn(PLUGIN_HANDLE); void plugin_start_fn(PLUGIN_HANDLE handle); void plugin_register_ingest_fn(PLUGIN_HANDLE handle,INGEST_CB2 cb,void * data); bool plugin_write_fn(PLUGIN_HANDLE handle, const std::string& name, const std::string& value); -bool plugin_operation_fn(PLUGIN_HANDLE handle, string operation, int parameterCount, PLUGIN_PARAMETER parameters[]); +bool plugin_operation_fn(PLUGIN_HANDLE handle, string operation, int parameterCount, PLUGIN_PARAMETER *parameters[]); /** @@ -271,7 +271,7 @@ bool plugin_write_fn(PLUGIN_HANDLE handle, const std::string& name, const std::s * @param parameterCount Number of parameters in Parameter list * @param parameters Parameter list */ -bool plugin_operation_fn(PLUGIN_HANDLE handle, string operation, int parameterCount, PLUGIN_PARAMETER parameters[]) +bool plugin_operation_fn(PLUGIN_HANDLE handle, string operation, int parameterCount, PLUGIN_PARAMETER *parameters[]) { bool rv = false; if (!handle) @@ -346,7 +346,7 @@ bool plugin_operation_fn(PLUGIN_HANDLE handle, string operation, int parameterCo PyObject *paramsList = PyList_New(parameterCount); for (int i=0; iname.c_str(), parameters[i]->value.c_str()) ); } // Call Python method passing an object and 2 C-style strings diff --git a/VERSION b/VERSION index ed70b8404e..ed5d342137 100644 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ -fledge_version=2.2.0 +fledge_version=2.3.0 fledge_schema=66 diff --git a/docs/91_version_history.rst b/docs/91_version_history.rst index 7c409f4108..327879bcd0 100644 --- a/docs/91_version_history.rst +++ b/docs/91_version_history.rst @@ -25,6 +25,73 @@ Version History Fledge v2 ========== +v2.3.0 +------- + +Release Date: 2023-12-28 + +- **Fledge Core** + + - New Features: + + - A new REST API has been added to the public API to allow performance counters to be fetched and reset. This API is intended for diagnostic purposes only. + - Improvements have been made to the way load issues on the storage service are logged. + - Documentation has been added that describes how to extend the API to include custom URLs for executing control functions. This documentation also shows how these are then called using the graphical user interface. + + - Bug Fix: + + - An issue with the PostgreSQL storage plugin when very large numbers of readings are ingested, more than 4294967296, has now been resolved. + - An issue with services shutting down rather than restarting when they fail to get a valid bearer token has been resolved. + - The user interface for creating write API endpoints was incorrectly requiring both a constant and a variable when only one is required. This is now resolved. + - A problem that meant parameters to set point control operations were not correctly sent to south plugins written in Python has been resolved. + + +- **GUI** + + - New Features: + + - The user interface has been upgraded to use Angular version 16. + - The configuration section of the user interface that allows for instance wide configuration has been improved with a single tree navigation item and improved visual feedback. + - A link to the documentation has been added to the Control API pages of the user interface. + + + - Bug Fix: + + - An issue that could cause some datapoint to display incorrectly in the user interface graph when multiple assets are displayed and those assets have data points with the same name in both assets has been resolved. + - An issue in the user interface that meant exporting data as a CSV file created incorrect files if any of the data point names contained a comma has been fixed. + - An issue with the user interface not always correctly showing the information for the dispatcher service has been resolved. + - A broken link to the documentation in the control pipeline user interface page of the user interface has been fixed. + + +- **Services & Plugins** + + - New Features: + + - The benchmark south plugin has been enhanced to increase the load that can be placed during testing. + - The fldege-south-s2opcua south plugin has been enhanced to allow filtering of nodes using regular expressions on the Browse Name of the nodes. + - The OMF north plugin has been updated to improve both the time and space efficiency of the lookup data used to map to PI Server objects. + - OMF North plugin documentation has been updated to show which version of the OMF specification the plugin will adopt when communicating with different versions of AVEVA products: PI Web API, Edge Data Store (EDS) and AVEVA Data Hub (ADH). + + + - Bug Fix: + + - A memory leak in the SQLite in-memory storage plugin has been resolved. + - A memory leak in the OMF north plugin has been resolved. + - An issue that could cause data to fail to send using the OMF plugin when the names of data points contain special characters has now been resolved. + - When the "Send full structure" configuration boolean was false, OMF North would create an AF structure anyways. All AF Elements were at the root of the AF database, with every AF Element having a single AF Attribute mapped to a PI Point. Creation of this AF structure would take a long time for large databases which would lead to PI Web API POST timeouts. This has been fixed. If the configuration boolean is false, OMF North will create PI Points only. In the configuration page, Send full structure has been renamed to "Create AF Structure". + - The OMF North plugin was unable to connect to AVEVA Data Hub (ADH) and OSIsoft Cloud Services (OCS) endpoints. This has been fixed. + - An issue with using an OMF Hint that defines a specific name to use with a tag has been resolved. The issue would show itself as the data not being sent to PI or ADH in some circumstances. + - An issue that meant some OPC UA nodes stored in the root of the hierarchy were not correctly ingested in the fldege-south-s2opcua south plugin has been resolved. + - The SQLite storage plugin had an issue that caused it to create overflow tables multiple times. This was not a problem in itself, but did cause the database to become locked for excessive periods of time, creating contention and delays for data ingestions in progress at the time. + - A problem that, in rare circumstances, could result in data being added to the incorrect asset in the SQLite plugin has been resolved. + - An issue with assets containing bracket characters not being stored in the PostgreSQL storage plugin has been resolved. + - An issue with string type parameters to control operations having extra pairs of quotes added has been resolved. + - A problem that caused the dispatcher service to log messages regarding incorrect bearer tokens has been resolved. + - The control dispatcher service was previously advertising itself before it had completed initialisation. This meant that a request could be received when it was partially configured, resulting in a crash of the service. Registration now takes place only once the service is completely ready to accept requests. + - The control dispatcher was not always using the correct source information when looking for matching pipelines. This has now been resolved. + - Control pipelines were previously still being executed if the entire pipeline was disabled, this has now been resolved. + + v2.2.0 ------- diff --git a/docs/OMF.rst b/docs/OMF.rst index fdbda34529..9ee26c68a9 100644 --- a/docs/OMF.rst +++ b/docs/OMF.rst @@ -132,7 +132,7 @@ The *Default Configuration* tab contains the most commonly modified items - *Edge Data Store* - The OSISoft Edge Data Store - - **Send full structure**: Used to control if Asset Framework structure messages are sent to the PI Server. If this is turned off then the data will not be placed in the Asset Framework. + - **Create AF Structure**: Used to control if Asset Framework structure messages are sent to the PI Server. If this is turned off then the data will not be placed in the Asset Framework. - **Naming scheme**: Defines the naming scheme to be used when creating the PI points in the PI Data Archive. See :ref:`Naming_Scheme`. @@ -667,3 +667,28 @@ Versions of this plugin prior to 2.1.0 created a complex type within OMF for eac As of version 2.1.0 this linking approach is used for all new assets created, if assets exist within the PI Server from versions of the plugin prior to 2.1.0 then the older, complex types will be used. It is possible to force the plugin to use complex types for all assets, both old and new, using the configuration option. It is also to force a particular asset to use the complex type mechanism using an OMFHint. +OMF Version Support +------------------- + +To date, AVEVA has released three versions of the OSIsoft Message Format (OMF) specification: 1.0, 1.1 and 1.2. +The OMF Plugin supports all three OMF versions. +The plugin will determine the OMF version to use by reading product version information from the AVEVA data destination system. +These are the OMF versions the plugin will use to post data: + ++-----------+----------+---------------------+ +|OMF Version|PI Web API|Edge Data Store (EDS)| ++===========+==========+=====================+ +| 1.2|- 2021 |- 2023 | +| |- 2021 SP1|- 2023 Patch 1 | +| |- 2021 SP2| | +| |- 2021 SP3| | +| |- 2023 | | ++-----------+----------+---------------------+ +| 1.1|- 2019 | | +| |- 2019 SP1| | ++-----------+----------+---------------------+ +| 1.0| |- 2020 | ++-----------+----------+---------------------+ + +The AVEVA Data Hub (ADH) is cloud-deployed and is always at the latest version of OMF support which is 1.2. +This includes the legacy OSIsoft Cloud Services (OCS) endpoints. diff --git a/docs/conf.py b/docs/conf.py index 18529328ca..de97a54cdc 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -177,4 +177,4 @@ # Pass Plugin DOCBRANCH argument in Makefile ; by default develop # NOTE: During release time we need to replace DOCBRANCH with actual released version -subprocess.run(["make generated DOCBRANCH='2.2.0RC'"], shell=True, check=True) +subprocess.run(["make generated DOCBRANCH='2.3.0RC'"], shell=True, check=True) diff --git a/docs/control.rst b/docs/control.rst index 3d6dc2404f..f52c4ab997 100644 --- a/docs/control.rst +++ b/docs/control.rst @@ -23,6 +23,16 @@ .. |pipeline_filter_config| image:: images/control/pipeline_filter_config.jpg .. |pipeline_context_menu| image:: images/control/pipeline_context_menu.jpg .. |pipeline_destination| image:: images/control/pipeline_destination.jpg +.. |control_api_1| image:: images/control/control_api_1.jpg +.. |control_api_2| image:: images/control/control_api_2.jpg +.. |control_api_3| image:: images/control/control_api_3.jpg +.. |control_api_4| image:: images/control/control_api_4.jpg +.. |control_api_5| image:: images/control/control_api_5.jpg +.. |control_api_6| image:: images/control/control_api_6.jpg +.. |control_api_7| image:: images/control/control_api_7.jpg +.. |control_api_8| image:: images/control/control_api_8.jpg +.. |control_api_9| image:: images/control/control_api_9.jpg +.. |control_api_10| image:: images/control/control_api_10.jpg .. Links .. |ExpressionFilter| raw:: html @@ -69,11 +79,9 @@ Set point control may be invoked via a number of paths with Fledge - As a result of a control message flowing from a north side system into a north plugin and being routed onward to the south service. -Currently only the notification method is fully implemented within Fledge. - The use of a notification in the Fledge instance itself provides the fastest response for an edge notification. All the processing for this is done on the edge by Fledge itself. -As with the data ingress and egress features of Fledge it is also possible to build filter pipelines in the control paths in order to alter the behavior and process the data in the control path. Pipelines in the control path as defined between the different end point of control operations and are defined such that the same pipeline can be utilised by multiple control paths. See :ref:`ControlPipelines` +As with the data ingress and egress features of Fledge it is also possible to build filter pipelines in the control paths in order to alter the behavior and process the data in the control path. Pipelines in the control path as defined between the different end point of control operations and are defined such that the same pipeline can be utilized by multiple control paths. See :ref:`ControlPipelines` Edge Based Control ------------------ @@ -323,7 +331,159 @@ The dispatcher can also be instructed to run a local automation script, these ar | |north_map4| | +--------------+ -Note, this is an example and does not mean that all or any plugins will use the exact syntax for mapping described above, the documentation for your particular plugin should be consulted to confirm the mapping implemented by the plugin. +.. note:: + + This is an example and does not mean that all or any plugins will use the exact syntax for mapping described above, the documentation for your particular plugin should be consulted to confirm the mapping implemented by the plugin. + +API Control Invocation +====================== + +Fledge allows the administer of the system to extend to REST API of Fledge to encompass custom defined entry point for invoking control operations within the Fledge instance. These configured API Control entry points can be called with a PUT operations to a URL of the form + +.. code-block:: console + + /fledge/control/request/{name} + + +Where *{name}* is a symbolic name that is defined by the user who configures the API request. + +A payload can be passed as a JSON document that may be processed into the request that will be sent to the control dispatcher. This process is discussed below. + +This effectively adds a new entry point to the Fledge public API, calling this entry point will call the control dispatcher to effectively route a control operation from the public API to one or more south services. The definition of the Control API Entry point allows restrictions to be placed on what calls can be made, by whom and with what data. + +Defining API Control Entry Points +--------------------------------- + +A control entry point has the following attributes + + - The type of control, write or operation + + - The destination for the control. This is the ultimate destination, all control requests will be routed via the control dispatcher. The destination may be one of service, asset, script or broadcast. + + - The operation name if the type is operation. + + - A set of constant key/value pairs that are sent either as the items to be written or as parameters if the type of the entry point is operation. Constants are always passed to the dispatcher call with the values defined here. + + - A set of key/value pairs that define the variables that may be passed into the control entry point in the request. The value given here represents a default value to use if no corresponding key is given in the request made via the public API. + + - A set of usernames for the users that are allowed to make requests to this entry point. If this is empty then no users are permitted to call the API entry point unless anonymous access has been enabled. See below. + + - A flag, with the key anonymous, that states if the entry point is open to all users, including where users are not authenticated with the API. It may take the values “true” or “false”. + + - The anonymous flag is really intended for situations when no user is logged into the system, i.e. authentication is not mandatory. It also serves a double purpose to allow a control API call to be open to all users. It is **not** recommended that this flag is set to “true” in production environments. + + +To define a new control entry point a POST request is made to the URL + +.. code-block:: console + + /fledge/control/manage + + +With a payload such as + +.. code-block:: JSON + + { + "name" : "FocusCamera1", + "description" : "Perform a focus operation on camera 1", + "type" : "operation", + "operation_name" : "focus", + "destination" : "service", + "service" : "camera1", + "constants" : { + "units" : "cm" + }, + "variables" : { + "distance" : "100" + }, + "allow" : [ "john", "fred" ], + "anonymous" : false + } + + +The above will define an API entry point that can be called with a PUT request to the URL + +.. code-block:: console + + /fledge/control/request/FocusCamera1 + +The payload of the request is defined by the set of variables that was created when the entry point was defined. Only keys given as variable names in the definition can be included in the payload of this call. If any variable is omitted from the payload of this call, then the default value that was defined when the entry point was defined will be used as the value of the variable that is passed in the payload to the dispatcher call that will action the request. + +The payload sent to the dispatcher will always contain all of the variables and constants defined in the API entry point. The values for the constants are always from the original definition, whereas the values of the variables can be given in the public API or if omitted the defaults defined when the entry point was defined will be used. + +Graphical User Interface +------------------------ + +The GUI functionality is accessed via the *API Entry Points* sub-menu of the *Control* menu in the left-hand menu pane. Selecting this option will display a screen that appears as follows. + ++-----------------+ +| |control_api_1| | ++-----------------+ + +Adding A Control API Entry Point +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + +Clicking on the *Add +* item in the top right corner will allow a new entry point to be defined. + ++-----------------+ +| |control_api_2| | ++-----------------+ + +Following the above example we can add the name of the entry point and select the type of control request we wish to make from the drop down menu. + ++-----------------+ +| |control_api_3| | ++-----------------+ + +We then enter destination, in this case service, by selecting it from the drop down. We can also enter the service name. + ++-----------------+ +| |control_api_4| | ++-----------------+ + +We can add constant and variable parameters to the entry point via the *Parameters* pane of the add entry page + ++-----------------+ +| |control_api_5| | ++-----------------+ + +Clicking on the *+ Add new variable* or *+Add new constant* items will add a pair of entry fields to allow you to enter the name and value for the variable or constant. + ++-----------------+ +| |control_api_6| | ++-----------------+ + +You may delete a variable or constant by clicking on the *x* icon next to the entry. + +The *Execution Access* pane allows control of who may execute the endpoint. Select the *Anonymous* toggle button will allow any user to execute the API. This is not recommended in a production environment. + ++-----------------+ +| |control_api_7| | ++-----------------+ + +The *Allow Users* drop down will provides a means to allow limited users to run the entry point and provides a list of defined users within the system to choose from. + +Finally a textual description of the operation may be given in the *Description* field. + +Clicking on *Save* will save an enable the new API entry point. The new entry point will be displayed on the resultant screen along with any others that have been defined previously. + ++-----------------+ +| |control_api_8| | ++-----------------+ + +Clicking on the three vertical dots will display a menu that allows the details of the entry point to be viewed and updated or to delete the new entry point. + ++-----------------+ +| |control_api_9| | ++-----------------+ + +It is also possible to execute the entry point from the GUI by clicking on the name of the entry point. You will be prompted to enter values for any variables that have been defined. + ++------------------+ +| |control_api_10| | ++------------------+ Control Dispatcher Service ========================== diff --git a/docs/images/OMF_Default.jpg b/docs/images/OMF_Default.jpg index 3d7b17ec09..3f6df81b2e 100644 Binary files a/docs/images/OMF_Default.jpg and b/docs/images/OMF_Default.jpg differ diff --git a/docs/images/control/control_api_1.jpg b/docs/images/control/control_api_1.jpg new file mode 100644 index 0000000000..b64892d34b Binary files /dev/null and b/docs/images/control/control_api_1.jpg differ diff --git a/docs/images/control/control_api_10.jpg b/docs/images/control/control_api_10.jpg new file mode 100644 index 0000000000..925c171e58 Binary files /dev/null and b/docs/images/control/control_api_10.jpg differ diff --git a/docs/images/control/control_api_2.jpg b/docs/images/control/control_api_2.jpg new file mode 100644 index 0000000000..2ebfb92888 Binary files /dev/null and b/docs/images/control/control_api_2.jpg differ diff --git a/docs/images/control/control_api_3.jpg b/docs/images/control/control_api_3.jpg new file mode 100644 index 0000000000..fbf17924a2 Binary files /dev/null and b/docs/images/control/control_api_3.jpg differ diff --git a/docs/images/control/control_api_4.jpg b/docs/images/control/control_api_4.jpg new file mode 100644 index 0000000000..35920b82f2 Binary files /dev/null and b/docs/images/control/control_api_4.jpg differ diff --git a/docs/images/control/control_api_5.jpg b/docs/images/control/control_api_5.jpg new file mode 100644 index 0000000000..c5dcb35906 Binary files /dev/null and b/docs/images/control/control_api_5.jpg differ diff --git a/docs/images/control/control_api_6.jpg b/docs/images/control/control_api_6.jpg new file mode 100644 index 0000000000..3693b19b0c Binary files /dev/null and b/docs/images/control/control_api_6.jpg differ diff --git a/docs/images/control/control_api_7.jpg b/docs/images/control/control_api_7.jpg new file mode 100644 index 0000000000..132823db38 Binary files /dev/null and b/docs/images/control/control_api_7.jpg differ diff --git a/docs/images/control/control_api_8.jpg b/docs/images/control/control_api_8.jpg new file mode 100644 index 0000000000..d7e94b7eb1 Binary files /dev/null and b/docs/images/control/control_api_8.jpg differ diff --git a/docs/images/control/control_api_9.jpg b/docs/images/control/control_api_9.jpg new file mode 100644 index 0000000000..4893ca4253 Binary files /dev/null and b/docs/images/control/control_api_9.jpg differ diff --git a/docs/plugin_developers_guide/09_packaging.rst b/docs/plugin_developers_guide/09_packaging.rst index 99fc7a4ee6..8f99b011b0 100644 --- a/docs/plugin_developers_guide/09_packaging.rst +++ b/docs/plugin_developers_guide/09_packaging.rst @@ -121,7 +121,6 @@ Common Additional Libraries Package Below are the packages which created a part of the process of building Fledge that are commonly used in plugins. - **fledge-mqtt** which is a packaged version of the libpaho-mqtt library. -- **fledge-gcp** which is a packaged version of the libjwt and libjansson libraries. - **fledge-iec** which is a packaged version of the IEC 60870 and IEC 61850 libraries. - **fledge-s2opcua** which is a packaged version of libexpat and libs2opc libraries. diff --git a/docs/plugin_developers_guide/10_testing.rst b/docs/plugin_developers_guide/10_testing.rst index 4b2d8a4030..8c80c0b88c 100644 --- a/docs/plugin_developers_guide/10_testing.rst +++ b/docs/plugin_developers_guide/10_testing.rst @@ -72,17 +72,17 @@ and their versions. "name": "http_north", "type": "north", "description": "HTTP North Plugin", - "version": "1.8.1", + "version": "2.2.0", "installedDirectory": "north/http_north", "packageName": "fledge-north-http-north" }, { - "name": "GCP", + "name": "Kafka", "type": "north", - "description": "Google Cloud Platform IoT-Core", - "version": "1.8.1", - "installedDirectory": "north/GCP", - "packageName": "fledge-north-gcp" + "description": "Simple plugin to send data to Kafka topic", + "version": "2.2.0", + "installedDirectory": "north/Kafka", + "packageName": "fledge-north-kafka" }, ... } @@ -118,8 +118,9 @@ and the function to call, usually *plugin_info*. .. code-block:: console - $ get_plugin_info plugins/north/GCP/libGCP.so plugin_info - {"name": "GCP", "version": "1.8.1", "type": "north", "interface": "1.0.0", "flag": 0, "config": { "plugin" : { "description" : "Google Cloud Platform IoT-Core", "type" : "string", "default" : "GCP", "readonly" : "true" }, "project_id" : { "description" : "The GCP IoT Core Project ID", "type" : "string", "default" : "", "order" : "1", "displayName" : "Project ID" }, "region" : { "description" : "The GCP Region", "type" : "enumeration", "options" : [ "us-central1", "europe-west1", "asia-east1" ], "default" : "us-central1", "order" : "2", "displayName" : "The GCP Region" }, "registry_id" : { "description" : "The Registry ID of the GCP Project", "type" : "string", "default" : "", "order" : "3", "displayName" : "Registry ID" }, "device_id" : { "description" : "Device ID within GCP IoT Core", "type" : "string", "default" : "", "order" : "4", "displayName" : "Device ID" }, "key" : { "description" : "Name of the key file to use", "type" : "string", "default" : "", "order" : "5", "displayName" : "Key Name" }, "algorithm" : { "description" : "JWT algorithm", "type" : "enumeration", "options" : [ "ES256", "RS256" ], "default" : "RS256", "order" : "6", "displayName" : "JWT Algorithm" }, "source": { "description" : "The source of data to send", "type" : "enumeration", "default" : "readings", "order" : "8", "displayName" : "Data Source", "options" : ["readings", "statistics"] } }} + $ ./get_plugin_info /usr/local/fledge/plugins/north/Kafka/libKafka.so plugin_info + {"name": "Kafka", "type": "north", "flag": 0, "version": "2.2.0", "interface": "1.0.0", "config": {"SSL_CERT": {"displayName": "Certificate Name", "description": "Name of client certificate for identity authentications", "default": "", "validity": "KafkaSecurityProtocol == \"SSL\" || KafkaSecurityProtocol == \"SASL_SSL\"", "group": "Encryption", "type": "string", "order": "10"}, "topic": {"mandatory": "true", "description": "The topic to send reading data on", "default": "Fledge", "displayName": "Kafka Topic", "type": "string", "order": "2"}, "brokers": {"displayName": "Bootstrap Brokers", "description": "The bootstrap broker list to retrieve full Kafka brokers", "default": "localhost:9092,kafka.local:9092", "mandatory": "true", "type": "string", "order": "1"}, "KafkaUserID": {"group": "Authentication", "description": "User ID to be used with SASL_PLAINTEXT security protocol", "default": "user", "validity": "KafkaSecurityProtocol == \"SASL_PLAINTEXT\" || KafkaSecurityProtocol == \"SASL_SSL\"", "displayName": "User ID", "type": "string", "order": "7"}, "KafkaSASLMechanism": {"group": "Authentication", "description": "Authentication mechanism to be used to connect to kafka broker", "default": "PLAIN", "displayName": "SASL Mechanism", "type": "enumeration", "order": "6", "validity": "KafkaSecurityProtocol == \"SASL_PLAINTEXT\" || KafkaSecurityProtocol == \"SASL_SSL\"", "options": ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]}, "SSL_Password": {"displayName": "Certificate Password", "description": "Optional: Password to be used when loading the certificate chain", "default": "", "validity": "KafkaSecurityProtocol == \"SSL\" || KafkaSecurityProtocol == \"SASL_SSL\"", "group": "Encryption", "type": "password", "order": "12"}, "compression": {"displayName": "Compression Codec", "description": "The compression codec to be used to send data to the Kafka broker", "default": "none", "order": "4", "type": "enumeration", "options": ["none", "gzip", "snappy", "lz4"]}, "plugin": {"default": "Kafka", "readonly": "true", "type": "string", "description": "Simple plugin to send data to a Kafka topic"}, "KafkaSecurityProtocol": {"group": "Authentication", "description": "Security protocol to be used to connect to kafka broker", "default": "PLAINTEXT", "options": ["PLAINTEXT", "SASL_PLAINTEXT", "SSL", "SASL_SSL"], "displayName": "Security Protocol", "type": "enumeration", "order": "5"}, "source": {"displayName": "Data Source", "description": "The source of data to send", "default": "readings", "order": "13", "type": "enumeration", "options": ["readings", "statistics"]}, "json": {"displayName": "Send JSON", "description": "Send as JSON objects or as strings", "default": "Strings", "order": "3", "type": "enumeration", "options": ["Objects", "Strings"]}, "SSL_CA_File": {"displayName": "Root CA Name", "description": "Name of the root certificate authority that will be used to verify the certificate", "default": "", "validity": "KafkaSecurityProtocol == \"SSL\" || KafkaSecurityProtocol == \"SASL_SSL\"", "group": "Encryption", "type": "string", "order": "9"}, "SSL_Keyfile": {"displayName": "Private Key Name", "description": "Name of client private key required for communication", "default": "", "validity": "KafkaSecurityProtocol == \"SSL\" || KafkaSecurityProtocol == \"SASL_SSL\"", "group": "Encryption", "type": "string", "order": "11"}, "KafkaPassword": {"group": "Authentication", "description": "Password to be used with SASL_PLAINTEXT security protocol", "default": "pass", "validity": "KafkaSecurityProtocol == \"SASL_PLAINTEXT\" || KafkaSecurityProtocol == \"SASL_SSL\"", "displayName": "Password", "type": "password", "order": "8"}}} + If there is an undefined symbol you will get an error from this utility. You can also check the validity of your JSON configuration by @@ -127,7 +128,7 @@ piping the output to a program such as jq. .. code-block:: console - $ get_plugin_info plugins/south/Random/libRandom.so plugin_info | jq + $ ./get_plugin_info plugins/south/Random/libRandom.so plugin_info | jq { "name": "Random", "version": "1.9.2", diff --git a/python/fledge/common/configuration_manager.py b/python/fledge/common/configuration_manager.py index ecbc8cb4c0..57d1885bca 100644 --- a/python/fledge/common/configuration_manager.py +++ b/python/fledge/common/configuration_manager.py @@ -35,7 +35,7 @@ # MAKE UPPER_CASE _valid_type_strings = sorted(['boolean', 'integer', 'float', 'string', 'IPv4', 'IPv6', 'X509 certificate', 'password', - 'JSON', 'URL', 'enumeration', 'script', 'code', 'northTask', 'ACL']) + 'JSON', 'URL', 'enumeration', 'script', 'code', 'northTask', 'ACL', 'bucket']) _optional_items = sorted(['readonly', 'order', 'length', 'maximum', 'minimum', 'rule', 'deprecated', 'displayName', 'validity', 'mandatory', 'group']) RESERVED_CATG = ['South', 'North', 'General', 'Advanced', 'Utilities', 'rest_api', 'Security', 'service', 'SCHEDULER', @@ -277,7 +277,6 @@ async def _validate_category_val(self, category_name, category_val, set_value_va def get_entry_val(k): v = [val for name, val in item_val.items() if name == k] return v[0] - for entry_name, entry_val in item_val.copy().items(): if type(entry_name) is not str: raise TypeError('For {} category, entry name {} must be a string for item name {}; got {}' @@ -309,6 +308,29 @@ def get_entry_val(k): raise TypeError('For {} category, entry value must be a string for item name {} and ' 'entry name {}; got {}'.format(category_name, item_name, entry_name, type(entry_val))) + # Validate bucket type and mandatory properties item_name + elif 'type' in item_val and get_entry_val("type") == 'bucket': + if 'properties' not in item_val: + raise KeyError('For {} category, properties KV pair must be required ' + 'for item name {}.'.format(category_name, item_name)) + if entry_name == 'properties': + prop_val = get_entry_val('properties') + if not isinstance(prop_val, dict): + raise ValueError('For {} category, properties must be JSON object for item name {}; got {}' + .format(category_name, item_name, type(entry_val))) + if not prop_val: + raise ValueError('For {} category, properties JSON object cannot be empty for item name {}' + ''.format(category_name, item_name)) + if 'key' not in prop_val: + raise ValueError('For {} category, key KV pair must exist in properties for item name {}' + ''.format(category_name, item_name)) + d = {entry_name: entry_val} + expected_item_entries.update(d) + else: + if type(entry_val) is not str: + raise TypeError('For {} category, entry value must be a string for item name {} and ' + 'entry name {}; got {}'.format(category_name, item_name, entry_name, + type(entry_val))) else: if type(entry_val) is not str: raise TypeError('For {} category, entry value must be a string for item name {} and ' @@ -968,14 +990,17 @@ async def set_optional_value_entry(self, category_name, item_name, optional_entr return # Validate optional types only when new_value_entry not empty; otherwise set empty value if new_value_entry: - if optional_entry_name == 'readonly' or optional_entry_name == 'deprecated' or optional_entry_name == 'mandatory': + if optional_entry_name == "properties": + raise ValueError('For {} category, optional item name properties cannot be updated.'.format( + category_name)) + elif optional_entry_name in ('readonly', 'deprecated', 'mandatory'): if self._validate_type_value('boolean', new_value_entry) is False: raise ValueError( 'For {} category, entry value must be boolean for optional item name {}; got {}' .format(category_name, optional_entry_name, type(new_value_entry))) - elif optional_entry_name == 'minimum' or optional_entry_name == 'maximum': - if (self._validate_type_value('integer', new_value_entry) or self._validate_type_value('float', - new_value_entry)) is False: + elif optional_entry_name in ('minimum', 'maximum'): + if (self._validate_type_value('integer', new_value_entry) or self._validate_type_value( + 'float', new_value_entry)) is False: raise ValueError('For {} category, entry value must be an integer or float for optional item ' '{}; got {}'.format(category_name, optional_entry_name, type(new_value_entry))) elif optional_entry_name in ('displayName', 'group', 'rule', 'validity'): diff --git a/python/fledge/services/core/api/configuration.py b/python/fledge/services/core/api/configuration.py index 6bcc41ff53..4c826ec744 100644 --- a/python/fledge/services/core/api/configuration.py +++ b/python/fledge/services/core/api/configuration.py @@ -426,6 +426,10 @@ async def add_configuration_item(request): result = await storage_client.update_tbl("configuration", payload) response = result['response'] + # update cache with new config item + if category_name in cf_mgr._cacheManager.cache: + cf_mgr._cacheManager.cache[category_name]['value'].update({new_config_item: data}) + # logged audit new config item for category audit = AuditLogger(storage_client) audit_details = {'category': category_name, 'item': new_config_item, 'value': config_item_dict} diff --git a/python/fledge/services/core/api/control_service/entrypoint.py b/python/fledge/services/core/api/control_service/entrypoint.py index 1c5942a11a..13a0a216ee 100644 --- a/python/fledge/services/core/api/control_service/entrypoint.py +++ b/python/fledge/services/core/api/control_service/entrypoint.py @@ -164,23 +164,17 @@ async def _check_parameters(payload, skip_required=False): if constants is not None: if not isinstance(constants, dict): raise ValueError('constants should be a dictionary.') - if not constants and _type == EntryPointType.WRITE.name.lower(): - raise ValueError('constants should not be empty.') final['constants'] = constants - else: - if _type == EntryPointType.WRITE.name.lower(): - raise ValueError("For type write constants must have passed in payload and cannot have empty value.") variables = payload.get('variables', None) if variables is not None: if not isinstance(variables, dict): raise ValueError('variables should be a dictionary.') - if not variables and _type == EntryPointType.WRITE.name.lower(): - raise ValueError('variables should not be empty.') final['variables'] = variables - else: - if _type == EntryPointType.WRITE.name.lower(): - raise ValueError("For type write variables must have passed in payload and cannot have empty value.") + + if _type == EntryPointType.WRITE.name.lower(): + if not variables and not constants: + raise ValueError('For write type either variables or constants should not be empty.') allow = payload.get('allow', None) if allow is not None: diff --git a/python/fledge/services/core/api/performance_monitor.py b/python/fledge/services/core/api/performance_monitor.py new file mode 100644 index 0000000000..9a983b2fc3 --- /dev/null +++ b/python/fledge/services/core/api/performance_monitor.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- + +# FLEDGE_BEGIN +# See: http://fledge-iot.readthedocs.io/ +# FLEDGE_END + +from aiohttp import web + +from fledge.common.logger import FLCoreLogger +from fledge.common.storage_client.payload_builder import PayloadBuilder +from fledge.services.core import connect + +__author__ = "Ashish Jabble" +__copyright__ = "Copyright (c) 2023, Dianomic Systems Inc." +__license__ = "Apache 2.0" +__version__ = "${VERSION}" + +_help = """ + ---------------------------------------------------------------- + | GET DELETE | /fledge/monitors | + | GET DELETE | /fledge/monitors/{service} | + | GET DELETE | /fledge/monitors/{service}/{counter} | + ---------------------------------------------------------------- +""" +_LOGGER = FLCoreLogger().get_logger(__name__) + +def setup(app): + app.router.add_route('GET', '/fledge/monitors', get_all) + app.router.add_route('GET', '/fledge/monitors/{service}', get_by_service_name) + app.router.add_route('GET', '/fledge/monitors/{service}/{counter}', get_by_service_and_counter_name) + app.router.add_route('DELETE', '/fledge/monitors', purge_all) + app.router.add_route('DELETE', '/fledge/monitors/{service}', purge_by_service) + app.router.add_route('DELETE', '/fledge/monitors/{service}/{counter}', purge_by_service_and_counter) + +# TODO: 8167 - Limit and Offset support and other pending queries + +async def get_all(request: web.Request) -> web.Response: + """ GET list of performance monitors + + :Example: + curl -sX GET http://localhost:8081/fledge/monitors + """ + storage = connect.get_storage_async() + monitors = await storage.query_tbl("monitors") + counters = monitors["rows"] + monitor = {} + response = {} + for c in counters: + val = {"average": c["average"], "maximum": c["maximum"], "minimum": c["minimum"], "samples": c["samples"], + "timestamp": c["ts"], "service": c["service"]} + monitor.setdefault(c['monitor'], []).append(val) + monitors = [{'monitor': k, 'values': v} for k, v in monitor.items()] + response["monitors"] = monitors + return web.json_response(response) + + +async def get_by_service_name(request: web.Request) -> web.Response: + """ GET performance monitors for the given service + + :Example: + curl -sX GET http://localhost:8081/fledge/monitors/ + """ + service = request.match_info.get('service', None) + storage = connect.get_storage_async() + payload = PayloadBuilder().SELECT("average", "maximum", "minimum", "monitor", "samples", "ts").ALIAS( + "return", ("ts", 'timestamp')).FORMAT("return", ("ts", "YYYY-MM-DD HH24:MI:SS.MS")).WHERE( + ["service", '=', service]).payload() + response = {"service": service} + result = await storage.query_tbl_with_payload('monitors', payload) + if 'rows' in result: + monitor = {} + for row in result["rows"]: + val = {"average": row["average"], "maximum": row["maximum"], "minimum": row["minimum"], + "samples": row["samples"], "timestamp": row["timestamp"]} + monitor.setdefault(row['monitor'], []).append(val) + monitors = [{'monitor': k, 'values': v} for k, v in monitor.items()] + response["monitors"] = monitors + return web.json_response(response) + +async def get_by_service_and_counter_name(request: web.Request) -> web.Response: + """ GET values for the single counter for the single service + + :Example: + curl -sX GET http://localhost:8081/fledge/monitors// + """ + service = request.match_info.get('service', None) + counter = request.match_info.get('counter', None) + + storage = connect.get_storage_async() + payload = PayloadBuilder().SELECT("average", "maximum", "minimum", "samples", "ts").ALIAS( + "return", ("ts", 'timestamp')).FORMAT("return", ("ts", "YYYY-MM-DD HH24:MI:SS.MS")).WHERE( + ["service", '=', service]).AND_WHERE(["monitor", '=', counter]).payload() + result = await storage.query_tbl_with_payload('monitors', payload) + response = {} + if 'rows' in result: + response = {"service": service, "monitors":{"monitor": counter}} + response["monitors"]["values"] = result["rows"] if result["rows"] else [] + return web.json_response(response) + +async def purge_all(request: web.Request) -> web.Response: + """ DELETE all performance monitors + + :Example: + curl -sX DELETE http://localhost:8081/fledge/monitors + """ + storage = connect.get_storage_async() + result = await storage.delete_from_tbl("monitors", {}) + message = "Nothing to remove for service performance counters." + if 'rows_affected' in result: + if result['response'] == "deleted" and result['rows_affected']: + message = "All Performance counters have been removed successfully." + return web.json_response({"message": message}) + +async def purge_by_service(request: web.Request) -> web.Response: + """ DELETE performance monitors for the given service + + :Example: + curl -sX DELETE http://localhost:8081/fledge/monitors/ + """ + service = request.match_info.get('service', None) + storage = connect.get_storage_async() + payload = PayloadBuilder().WHERE(["service", '=', service]).payload() + result = await storage.delete_from_tbl("monitors", payload) + message = "Nothing to remove counters from '{}' service.".format(service) + if 'rows_affected' in result: + if result['response'] == "deleted" and result['rows_affected']: + message = "Performance counters have been removed from '{}' service.".format(service) + return web.json_response({"message": message}) + +async def purge_by_service_and_counter(request: web.Request) -> web.Response: + """ DELETE performance monitors for the single counter for the single service + + :Example: + curl -sX DELETE http://localhost:8081/fledge/monitors// + """ + service = request.match_info.get('service', None) + counter = request.match_info.get('counter', None) + storage = connect.get_storage_async() + payload = PayloadBuilder().WHERE(["service", '=', service]).AND_WHERE( + ["monitor", '=', counter]).payload() + result = await storage.delete_from_tbl("monitors", payload) + message = "Nothing to remove '{}' counter from '{}' service.".format(counter, service) + if 'rows_affected' in result: + if result['response'] == "deleted" and result['rows_affected']: + message = "Performance '{}' counter has been removed from '{}' service.".format(counter, service) + return web.json_response({"message": message}) diff --git a/python/fledge/services/core/proxy.py b/python/fledge/services/core/proxy.py index 8d4d40b478..ae2e9e8f6b 100644 --- a/python/fledge/services/core/proxy.py +++ b/python/fledge/services/core/proxy.py @@ -140,7 +140,7 @@ async def handler(request: web.Request) -> web.Response: if is_proxy_svc_found and proxy_svc_name is not None: svc, token = await _get_service_record_info_along_with_bearer_token(proxy_svc_name) url = str(request.url).split('fledge/extension/')[1] - status_code, response = await _call_microservice_service_api( + status_code, response, content_type = await _call_microservice_service_api( request, svc._protocol, svc._address, svc._port, url, token) else: msg = "{} route not found.".format(request.rel_url) @@ -149,8 +149,7 @@ async def handler(request: web.Request) -> web.Response: msg = str(ex) raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) else: - return web.json_response(status=status_code, body=response) - + return web.json_response(status=status_code, body=response, content_type=content_type) async def _get_service_record_info_along_with_bearer_token(svc_name): try: @@ -175,8 +174,8 @@ async def _call_microservice_service_api( if request.method == 'GET': async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as resp: - message = await resp.text() - response = (resp.status, message) + message = await resp.read() if resp.content_type == 'application/octet-stream' else await resp.text() + response = (resp.status, message, resp.content_type) if resp.status not in range(200, 209): _logger.error("GET Request Error: Http status code: {}, reason: {}, response: {}".format( resp.status, resp.reason, message)) @@ -225,5 +224,9 @@ async def _call_microservice_service_api( except Exception as ex: raise Exception(str(ex)) else: - # Return Tuple - (http statuscode, message) - return response + response_tuples = response + # Default content-type is 'application/json' + if len(response) == 2: + response_tuples = response + ('application/json',) + # Return Tuple - (http statuscode, message, content-type) + return response_tuples diff --git a/python/fledge/services/core/routes.py b/python/fledge/services/core/routes.py index b356008e63..469d66af30 100644 --- a/python/fledge/services/core/routes.py +++ b/python/fledge/services/core/routes.py @@ -5,7 +5,7 @@ # FLEDGE_END from fledge.services.core import proxy -from fledge.services.core.api import asset_tracker, auth, backup_restore, browser, certificate_store, filters, health, notification, north, package_log, python_packages, south, support, service, task, update +from fledge.services.core.api import asset_tracker, auth, backup_restore, browser, certificate_store, filters, health, notification, north, package_log, performance_monitor, python_packages, south, support, service, task, update from fledge.services.core.api import audit as api_audit from fledge.services.core.api import common as api_common from fledge.services.core.api import configuration as api_configuration @@ -264,6 +264,9 @@ def setup(app): # Proxy Admin API setup with regex proxy.admin_api_setup(app) + # Performance Monitor + performance_monitor.setup(app) + # enable cors support enable_cors(app) diff --git a/python/fledge/services/core/server.py b/python/fledge/services/core/server.py index 9faa4d19b3..ee4bd61c40 100755 --- a/python/fledge/services/core/server.py +++ b/python/fledge/services/core/server.py @@ -1303,32 +1303,35 @@ async def restart_service(cls, request): :Example: curl -X PUT http://localhost:/fledge/service/dc9bfc01-066a-4cc0-b068-9c35486db87f/restart """ - try: service_id = request.match_info.get('service_id', None) - try: services = ServiceRegistry.get(idx=service_id) except service_registry_exceptions.DoesNotExist: raise ValueError('Service with {} does not exist'.format(service_id)) ServiceRegistry.restart(service_id) - if cls._storage_client_async is not None and services[0]._name not in ("Fledge Storage", "Fledge Core"): try: cls._audit = AuditLogger(cls._storage_client_async) await cls._audit.information('SRVRS', {'name': services[0]._name}) except Exception as ex: _logger.exception(ex) - - _resp = {'id': str(service_id), 'message': 'Service restart requested'} - - return web.json_response(_resp) - except ValueError as ex: - raise web.HTTPNotFound(reason=str(ex)) + """ Special Case: + For BucketStorage type we have used proxy map for interfacing REST API endpoints + to Microservice service API endpoints. Therefore we need to clear the proxy map on restart. + """ + if services[0]._type == "BucketStorage": + cls._API_PROXIES = {} + except ValueError as err: + msg = str(err) + raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) except Exception as ex: msg = str(ex) raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) + else: + _resp = {'id': str(service_id), 'message': 'Service restart requested'} + return web.json_response(_resp) @classmethod async def get_service(cls, request): diff --git a/scripts/services/north_C b/scripts/services/north_C index 80fa8a7968..3cf728d425 100755 --- a/scripts/services/north_C +++ b/scripts/services/north_C @@ -27,11 +27,44 @@ if [ "$VALGRIND_NORTH" != "" ]; then done fi +runstrace=n +if [ "$STRACE_NORTH" != "" ]; then + for i in "$@"; do + case $i in + --name=*) + name="`echo $i | sed -e s/--name=//`" + ;; + esac + done + services=$(echo $STRACE_NORTH | tr ";" "\n") + for service in $services; do + if [ "$service" = "$name" ]; then + runstrace=y + fi + done +fi + cd "${FLEDGE_ROOT}/services" if [ "$runvalgrind" = "y" ]; then file=${HOME}/north.${name}.valgrind.out rm -f $file - valgrind --leak-check=full --trace-children=yes --show-leak-kinds=all --track-origins=yes --log-file=$file ./fledge.services.north "$@" + logger "Running north service $name under valgrind" + if [ "$VALGRIND_MASSIF" != "" ]; then + valgrind --tool=massif --detailed-freq=1 --pages-as-heap=yes ./fledge.services.north "$@" + else + valgrind --leak-check=full --trace-children=yes --show-leak-kinds=all --track-origins=yes --log-file=$file ./fledge.services.north "$@" + fi +elif [ "$runstrace" = "y" ]; then + file=${HOME}/north.${name}.strace.out + logger "Running north service $name under strace" + rm -f $file + strace -e 'trace=%memory,%process,%file' -f -o $file ./fledge.services.north "$@" +elif [ "$INTERPOSE_NORTH" != "" ]; then + LD_PRELOAD=${INTERPOSE_NORTH} + logger "Running north service with interpose library $INTERPOSE_NORTH" + export LD_PRELOAD + ./fledge.services.north "$@" + unset LD_PRELOAD else ./fledge.services.north "$@" fi diff --git a/tests/system/python/conftest.py b/tests/system/python/conftest.py index 62c4c37edb..72332ceb81 100644 --- a/tests/system/python/conftest.py +++ b/tests/system/python/conftest.py @@ -904,17 +904,6 @@ def pytest_addoption(parser): parser.addoption("--exclude-packages-list", action="store", default="None", help="Packages to be excluded from test e.g. --exclude-packages-list=fledge-south-sinusoid,fledge-filter-log") - # GCP config - parser.addoption("--gcp-project-id", action="store", default="nomadic-groove-264509", help="GCP Project ID") - parser.addoption("--gcp-registry-id", action="store", default="fl-nerd--registry", help="GCP Registry ID") - parser.addoption("--gcp-device-gateway-id", action="store", default="fl-nerd-gateway", help="GCP Device ID") - parser.addoption("--gcp-subscription-name", action="store", default="my-subscription", help="GCP Subscription name") - parser.addoption("--google-app-credentials", action="store", help="GCP JSON credentials file path") - parser.addoption("--gcp-cert-path", action="store", default="./data/gcp/rsa_private.pem", - help="GCP certificate path") - parser.addoption("--gcp-logger-name", action="store", default="cloudfunctions.googleapis.com%2Fcloud-functions", - help="GCP Logger name") - # Config required for testing fledge under impaired network. parser.addoption("--south-service-wait-time", action="store", type=int, default=20, @@ -1182,41 +1171,6 @@ def package_build_source_list(request): return request.config.getoption("--package-build-source-list") -@pytest.fixture -def gcp_project_id(request): - return request.config.getoption("--gcp-project-id") - - -@pytest.fixture -def gcp_registry_id(request): - return request.config.getoption("--gcp-registry-id") - - -@pytest.fixture -def gcp_device_gateway_id(request): - return request.config.getoption("--gcp-device-gateway-id") - - -@pytest.fixture -def gcp_subscription_name(request): - return request.config.getoption("--gcp-subscription-name") - - -@pytest.fixture -def google_app_credentials(request): - return request.config.getoption("--google-app-credentials") - - -@pytest.fixture -def gcp_cert_path(request): - return request.config.getoption("--gcp-cert-path") - - -@pytest.fixture -def gcp_logger_name(request): - return request.config.getoption("--gcp-logger-name") - - @pytest.fixture def exclude_packages_list(request): return request.config.getoption("--exclude-packages-list") diff --git a/tests/system/python/packages/test_gcp_gateway.py b/tests/system/python/packages/test_gcp_gateway.py deleted file mode 100644 index 229f1e517e..0000000000 --- a/tests/system/python/packages/test_gcp_gateway.py +++ /dev/null @@ -1,234 +0,0 @@ -# -*- coding: utf-8 -*- - -# FLEDGE_BEGIN -# See: http://fledge-iot.readthedocs.io/ -# FLEDGE_END - -""" Test GCP Gateway plugin - -""" - -import os -import subprocess -import http.client -import json -import time -from pathlib import Path -from datetime import timezone, datetime -import utils -import pytest -from pytest import PKG_MGR - - -__author__ = "Yash Tatkondawar" -__copyright__ = "Copyright (c) 2020 Dianomic Systems Inc." -__license__ = "Apache 2.0" -__version__ = "${VERSION}" - -task_name = "gcp-gateway" -north_plugin = "GCP" -# This gives the path of directory where fledge is cloned. test_file < packages < python < system < tests < ROOT -PROJECT_ROOT = Path(__file__).parent.parent.parent.parent.parent -SCRIPTS_DIR_ROOT = "{}/tests/system/python/packages/data/".format(PROJECT_ROOT) -FLEDGE_ROOT = os.environ.get('FLEDGE_ROOT') -CERTS_DIR = "{}/gcp".format(SCRIPTS_DIR_ROOT) -FLEDGE_CERTS_PEM_DIR = "{}/data/etc/certs/pem/".format(FLEDGE_ROOT) - - -@pytest.fixture -def check_fledge_root(): - assert FLEDGE_ROOT, "Please set FLEDGE_ROOT!" - - -@pytest.fixture -def reset_fledge(wait_time): - try: - subprocess.run(["cd {}/tests/system/python/scripts/package && ./reset" - .format(PROJECT_ROOT)], shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "reset package script failed!" - - # Wait for fledge server to start - time.sleep(wait_time) - - -@pytest.fixture -def remove_and_add_pkgs(package_build_version): - try: - subprocess.run(["cd {}/tests/system/python/scripts/package && ./remove" - .format(PROJECT_ROOT)], shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "remove package script failed!" - - try: - subprocess.run(["cd {}/tests/system/python/scripts/package/ && ./setup {}" - .format(PROJECT_ROOT, package_build_version)], shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "setup package script failed" - - try: - subprocess.run(["sudo {} install -y fledge-north-gcp fledge-south-sinusoid".format(PKG_MGR)], - shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "installation of gcp-gateway and sinusoid packages failed" - - try: - subprocess.run(["python3 -m pip install google-cloud-pubsub==1.1.0"], shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "pip installation of google-cloud-pubsub failed" - - try: - subprocess.run(["python3 -m pip install google-cloud-logging==1.15.1"], shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "pip installation of google-cloud-logging failed" - - try: - subprocess.run(["if [ ! -f \"{}/roots.pem\" ]; then wget https://pki.goog/roots.pem -P {}; fi" - .format(CERTS_DIR, CERTS_DIR)], shell=True, check=True) - except subprocess.CalledProcessError: - assert False, "download of roots.pem failed" - - -def get_ping_status(fledge_url): - _connection = http.client.HTTPConnection(fledge_url) - _connection.request("GET", '/fledge/ping') - r = _connection.getresponse() - assert 200 == r.status - r = r.read().decode() - jdoc = json.loads(r) - return jdoc - - -def get_statistics_map(fledge_url): - _connection = http.client.HTTPConnection(fledge_url) - _connection.request("GET", '/fledge/statistics') - r = _connection.getresponse() - assert 200 == r.status - r = r.read().decode() - jdoc = json.loads(r) - return utils.serialize_stats_map(jdoc) - - -# Get the latest 5 timestamps, readings of data sent from south to compare it with the timestamps, -# readings of data in GCP. -def get_asset_info(fledge_url): - _connection = http.client.HTTPConnection(fledge_url) - _connection.request("GET", '/fledge/asset/sinusoid?limit=5') - r = _connection.getresponse() - assert 200 == r.status - r = r.read().decode() - jdoc = json.loads(r) - for j in jdoc: - j['timestamp'] = datetime.strptime(j['timestamp'], "%Y-%m-%d %H:%M:%S.%f").astimezone( - timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f") - return jdoc - - -def copy_certs(gcp_cert_path): - # As we are not uploading pem certificate via cert Upload API, therefore below code is required for pem certs - create_cert_pem_dir = "mkdir -p {}".format(FLEDGE_CERTS_PEM_DIR) - os.system(create_cert_pem_dir) - assert os.path.isdir(FLEDGE_CERTS_PEM_DIR) - copy_file = "cp {} {}/roots.pem {}".format(gcp_cert_path, CERTS_DIR, FLEDGE_CERTS_PEM_DIR) - os.system(copy_file) - assert os.path.isfile("{}/roots.pem".format(FLEDGE_CERTS_PEM_DIR)) - - -@pytest.fixture -def verify_and_set_prerequisites(gcp_cert_path, google_app_credentials): - assert os.path.exists("{}".format(gcp_cert_path)), "Private key not found at {}"\ - .format(gcp_cert_path) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = google_app_credentials - - -def verify_received_messages(logger_name, asset_info, retries, wait_time): - - from google.cloud import logging - from google.cloud.logging import DESCENDING - - # Lists the most recent entries for a given logger. - logging_client = logging.Client() - logger = logging_client.logger(logger_name) - - # Fetches the latest logs from GCP and comaperes it with current timestamp - while retries: - iterator = logger.list_entries(order_by=DESCENDING, page_size=10, filter_="severity=INFO") - pages = iterator.pages - page = next(pages) # API call - gcp_log_string = "" - gcp_info = [] - for entry in page: - gcp_log_string += entry.payload - assert len(gcp_log_string), "No data seen in GCP. " - gcp_log_dict = json.loads("[" + gcp_log_string.replace("}{", "},{") + "]") - for r in gcp_log_dict: - for d in range(0, len(r["sinusoid"])): - gcp_info.append(r["sinusoid"][d]) - assert len(gcp_info), "No Sinusoid readings GCP logs found" - found = 0 - for i in gcp_info: - for d in asset_info: - if d['timestamp'] == i['ts']: - assert d['reading']['sinusoid'] == i['sinusoid'] - found += 1 - if found == len(asset_info): - break - else: - retries -= 1 - time.sleep(wait_time) - - if retries == 0: - assert False, "TIMEOUT! sinusoid data sent not seen in GCP. " - - -class TestGCPGateway: - def test_gcp_gateway(self, check_fledge_root, verify_and_set_prerequisites, remove_and_add_pkgs, reset_fledge, - fledge_url, wait_time, remove_data_file, gcp_project_id, gcp_device_gateway_id, - gcp_registry_id, gcp_cert_path, gcp_logger_name, retries): - payload = {"name": "Sine", "type": "south", "plugin": "sinusoid", "enabled": True, "config": {}} - post_url = "/fledge/service" - conn = http.client.HTTPConnection(fledge_url) - conn.request("POST", post_url, json.dumps(payload)) - res = conn.getresponse() - assert 200 == res.status, "ERROR! POST {} request failed".format(post_url) - - copy_certs(gcp_cert_path) - - gcp_project_cfg = {"project_id": {"value": "{}".format(gcp_project_id)}, - "registry_id": {"value": "{}".format(gcp_registry_id)}, - "device_id": {"value": "{}".format(gcp_device_gateway_id)}, - "key": {"value": "rsa_private"}} - - payload = {"name": task_name, - "plugin": "{}".format(north_plugin), - "type": "north", - "schedule_type": 3, - "schedule_repeat": 5, - "schedule_enabled": True, - "config": gcp_project_cfg - } - - post_url = "/fledge/scheduled/task" - conn = http.client.HTTPConnection(fledge_url) - conn.request("POST", post_url, json.dumps(payload)) - res = conn.getresponse() - assert 200 == res.status, "ERROR! POST {} request failed".format(post_url) - - time.sleep(wait_time) - - ping_response = get_ping_status(fledge_url) - assert 0 < ping_response["dataRead"] - assert 0 < ping_response["dataSent"] - - actual_stats_map = get_statistics_map(fledge_url) - assert 0 < actual_stats_map['SINUSOID'] - assert 0 < actual_stats_map['READINGS'] - assert 0 < actual_stats_map['Readings Sent'] - assert 0 < actual_stats_map[task_name] - - asset_info = get_asset_info(fledge_url) - - verify_received_messages(gcp_logger_name, asset_info, retries, wait_time) - - remove_data_file("{}/rsa_private.pem".format(FLEDGE_CERTS_PEM_DIR)) - remove_data_file("{}/roots.pem".format(FLEDGE_CERTS_PEM_DIR)) diff --git a/tests/system/python/packages/test_north_pi_webapi_nw_throttle.py b/tests/system/python/packages/test_north_pi_webapi_nw_throttle.py index 42edec7ddd..e52dba4d93 100644 --- a/tests/system/python/packages/test_north_pi_webapi_nw_throttle.py +++ b/tests/system/python/packages/test_north_pi_webapi_nw_throttle.py @@ -329,7 +329,7 @@ def test_omf_in_impaired_network(self, clean_setup_fledge_packages, reset_fledge raise Exception("None of packet delay or rate limit given, " "cannot apply network impairment.") # Insert some readings before turning off compression. - time.sleep(2) + time.sleep(3) # Turn off south service disable_schedule(fledge_url, SOUTH_SERVICE_NAME) time.sleep(5) diff --git a/tests/system/python/packages/test_rule_data_availability.py b/tests/system/python/packages/test_rule_data_availability.py index 38faa3b97a..2561685af1 100644 --- a/tests/system/python/packages/test_rule_data_availability.py +++ b/tests/system/python/packages/test_rule_data_availability.py @@ -263,6 +263,7 @@ def test_data_availability_north(self, check_eds_installed, reset_fledge, start_ get_url = "/fledge/audit?source=NTFSN" resp1 = utils.get_request(fledge_url, get_url) + time.sleep(wait_time) get_url = "/fledge/audit?source=NTFSN" resp2 = utils.get_request(fledge_url, get_url) assert len(resp2['audit']) > len(resp1['audit']), "ERROR: NTFSN not triggered properly with asset code" diff --git a/tests/unit/python/fledge/common/test_configuration_manager.py b/tests/unit/python/fledge/common/test_configuration_manager.py index 5048a75f03..3587e48c00 100644 --- a/tests/unit/python/fledge/common/test_configuration_manager.py +++ b/tests/unit/python/fledge/common/test_configuration_manager.py @@ -35,7 +35,7 @@ def reset_singleton(self): def test_supported_validate_type_strings(self): expected_types = ['IPv4', 'IPv6', 'JSON', 'URL', 'X509 certificate', 'boolean', 'code', 'enumeration', 'float', 'integer', - 'northTask', 'password', 'script', 'string', 'ACL'] + 'northTask', 'password', 'script', 'string', 'ACL', 'bucket'] assert len(expected_types) == len(_valid_type_strings) assert sorted(expected_types) == _valid_type_strings @@ -533,28 +533,80 @@ async def test__validate_category_val_enum_type_bad(self, config, exception_name assert excinfo.type is exception_name assert exception_msg == str(excinfo.value) + @pytest.mark.skip(reason="FOGL-8281") + @pytest.mark.parametrize("config", [ + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A"}}), + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A", "properties": "{}"}}), + ({"item": {"description": "test description", "type": "string", "default": "A"}, + ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A"}}), + ]) + async def test__validate_category_val_bucket_type_good(self, config): + storage_client_mock = MagicMock(spec=StorageClientAsync) + c_mgr = ConfigurationManager(storage_client_mock) + c_return_value = await c_mgr._validate_category_val(category_name=CAT_NAME, category_val=config, + set_value_val_from_default_val=True) + assert isinstance(c_return_value, dict) + + @pytest.mark.parametrize("config, exc_name, reason", [ + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A"}}, KeyError, + "'For {} category, properties KV pair must be required for item name {}.'".format(CAT_NAME, ITEM_NAME)), + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A", "property": '{"a": 1}'}}, + KeyError, "'For {} category, properties KV pair must be required for item name {}.'".format( + CAT_NAME, ITEM_NAME)), + ({"item": {"description": "test description", "type": "string", "default": "A", "value": "B"}, + ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A"}}, KeyError, + "'For {} category, properties KV pair must be required for item name {}.'".format(CAT_NAME, ITEM_NAME)), + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A", "properties": '{"a": 1}'}}, + ValueError, "For {} category, properties must be JSON object for item name {}; got ".format( + CAT_NAME, ITEM_NAME)), + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A", "properties": {}}}, + ValueError, "For {} category, properties JSON object cannot be empty for item name {}".format( + CAT_NAME, ITEM_NAME)), + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": "A", "properties": {"k": "v"}}}, + ValueError, "For {} category, key KV pair must exist in properties for item name {}".format( + CAT_NAME, ITEM_NAME)), + ({ITEM_NAME: {"description": "test description", "type": "bucket", "default": {}, "properties": {"key": "v"}}}, + TypeError, "For {} category, entry value must be a string for item name {} and entry name default; " + "got ".format(CAT_NAME, ITEM_NAME)) + ]) + async def test__validate_category_val_bucket_type_bad(self, config, exc_name, reason): + storage_client_mock = MagicMock(spec=StorageClientAsync) + c_mgr = ConfigurationManager(storage_client_mock) + with pytest.raises(Exception) as excinfo: + await c_mgr._validate_category_val(category_name=CAT_NAME, category_val=config, + set_value_val_from_default_val=False) + assert excinfo.type is exc_name + assert reason == str(excinfo.value) + @pytest.mark.parametrize("_type, value, from_default_val", [ ("integer", " ", False), ("string", "", False), ("string", " ", False), ("JSON", "", False), ("JSON", " ", False), + ("bucket", "", False), + ("bucket", " ", False), ("integer", " ", True), ("string", "", True), ("string", " ", True), ("JSON", "", True), - ("JSON", " ", True) + ("JSON", " ", True), + ("bucket", "", True), + ("bucket", " ", True) ]) async def test__validate_category_val_with_optional_mandatory(self, _type, value, from_default_val): storage_client_mock = MagicMock(spec=StorageClientAsync) c_mgr = ConfigurationManager(storage_client_mock) test_config = {ITEM_NAME: {"description": "test description", "type": _type, "default": value, "mandatory": "true"}} + if _type == "bucket": + test_config[ITEM_NAME]['properties'] = {"key": "foo"} with pytest.raises(Exception) as excinfo: await c_mgr._validate_category_val(category_name=CAT_NAME, category_val=test_config, set_value_val_from_default_val=from_default_val) assert excinfo.type is ValueError - assert "For {} category, A default value must be given for {}".format(CAT_NAME, ITEM_NAME) == str(excinfo.value) + assert ("For {} category, A default value must be given for {}" + "").format(CAT_NAME, ITEM_NAME) == str(excinfo.value) async def test__validate_category_val_with_enum_type(self, reset_singleton): storage_client_mock = MagicMock(spec=StorageClientAsync) @@ -3476,7 +3528,8 @@ async def async_mock(return_value): (None, 'group', 5, "For catname category, entry value must be string for optional item group; got "), (None, 'group', True, - "For catname category, entry value must be string for optional item group; got ") + "For catname category, entry value must be string for optional item group; got "), + (None, 'properties', {"key": "Bot"}, 'For catname category, optional item name properties cannot be updated.') ]) async def test_set_optional_value_entry_bad_update(self, reset_singleton, _type, optional_key_name, new_value_entry, exc_msg): @@ -3497,7 +3550,8 @@ async def async_mock(return_value): storage_value_entry = {'length': '255', 'displayName': category_name, 'rule': 'value * 3 == 6', 'deprecated': 'false', 'readonly': 'true', 'type': 'string', 'order': '4', 'description': 'Test Optional', 'minimum': minimum, 'value': '13', 'maximum': maximum, - 'default': '13', 'validity': 'field X is set', 'mandatory': 'false', 'group': 'Security'} + 'default': '13', 'validity': 'field X is set', 'mandatory': 'false', 'group': 'Security', + 'properties': {"key": "model"}} # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: @@ -3509,6 +3563,7 @@ async def async_mock(return_value): with patch.object(ConfigurationManager, '_read_item_val', return_value=_rv) as readpatch: with pytest.raises(Exception) as excinfo: await c_mgr.set_optional_value_entry(category_name, item_name, optional_key_name, new_value_entry) + assert excinfo.type is ValueError assert exc_msg == str(excinfo.value) readpatch.assert_called_once_with(category_name, item_name) diff --git a/tests/unit/python/fledge/services/core/api/control_service/test_entrypoint.py b/tests/unit/python/fledge/services/core/api/control_service/test_entrypoint.py index 13c842ffdc..95d85a876f 100644 --- a/tests/unit/python/fledge/services/core/api/control_service/test_entrypoint.py +++ b/tests/unit/python/fledge/services/core/api/control_service/test_entrypoint.py @@ -478,7 +478,19 @@ async def test__get_entrypoint(self): 'destination': 'service', 'service': 'Camera', 'constants': {'unit': 'cm'}, 'variables': {'aperture': 'f/11'}}, {'name': 'FocusCamera', 'description': 'Perform focus on camera', 'type': 'operation', 'operation_name': 'OP', 'destination': 'script', 'script': 'S1', 'anonymous': False, - 'constants': {'unit': 'cm'}, 'variables': {'aperture': 'f/16'}} + 'constants': {'unit': 'cm'}, 'variables': {'aperture': 'f/16'}}, + {'name': 'EP1', 'description': 'Entry Point', 'type': 'write', 'destination': 'broadcast', + 'constants': {'seed': '100'}, 'anonymous': True}, + {'name': 'EP2', 'description': 'Entry Point', 'type': 'write', 'destination': 'broadcast', + 'variables': {'seed': '100'}, 'anonymous': True}, + {'name': 'EP3', 'description': 'Entry Point', 'type': 'write', 'destination': 'broadcast', + 'constants': {'seed': '100', 'param2': "foo"}, 'anonymous': False, 'allow': []}, + {'name': 'EP4', 'description': 'Entry Point', 'type': 'write', 'destination': 'broadcast', + 'variables': {'seed': '100', 'param2': "foo"}, 'anonymous': False, 'allow': []}, + {'name': 'EP #5', 'description': 'Entry Point', 'type': 'write', 'destination': 'asset', "asset": "Random", + 'variables': {'seed': '100', 'param2': "foo"}, 'anonymous': True}, + {'name': 'EP-123', 'description': 'Entry Point', 'type': 'write', 'destination': 'service', "service": "S1", + 'variables': {'seed': '100', 'param2': "foo"}, 'anonymous': False, 'allow': []} ]) async def test__check_parameters(self, payload): cols = await entrypoint._check_parameters(payload) @@ -527,13 +539,16 @@ async def test__check_parameters_without_required_keys(self, payload): "Control entrypoint destination argument cannot be empty."), ({"anonymous": "t"}, ValueError, "anonymous should be a bool."), ({"constants": "t"}, ValueError, "constants should be a dictionary."), - ({"type": "write", "constants": {}}, ValueError, "constants should not be empty."), - ({"type": "write", "constants": None}, ValueError, - "For type write constants must have passed in payload and cannot have empty value."), ({"variables": "t"}, ValueError, "variables should be a dictionary."), - ({"type": "write", "constants": {"unit": "cm"}, "variables": {}}, ValueError, "variables should not be empty."), - ({"type": "write", "constants": {"unit": "cm"}, "variables": None}, ValueError, - "For type write variables must have passed in payload and cannot have empty value."), + ({"type": "write"}, ValueError, "For write type either variables or constants should not be empty."), + ({"type": "write", "constants": {}}, ValueError, + "For write type either variables or constants should not be empty."), + ({"type": "write", "variables": {}}, ValueError, + "For write type either variables or constants should not be empty."), + ({"type": "write", "constants": None}, ValueError, + "For write type either variables or constants should not be empty."), + ({"type": "write", "variables": None}, ValueError, + "For write type either variables or constants should not be empty."), ({"allow": "user"}, ValueError, "allow should be an array of list of users.") ]) async def test_bad__check_parameters(self, payload, exception_name, error_msg): diff --git a/tests/unit/python/fledge/services/core/api/test_package_log.py b/tests/unit/python/fledge/services/core/api/test_package_log.py index 1bdf93a4d2..0051f2e0d0 100644 --- a/tests/unit/python/fledge/services/core/api/test_package_log.py +++ b/tests/unit/python/fledge/services/core/api/test_package_log.py @@ -245,9 +245,9 @@ async def mock_coro(): return {"rows": [{'id': 'b57fd5c5-8079-49ff-b6a1-9515cbd259e4', 'name': 'fledge-south-random', 'action': "install", 'status': -1, 'log_file_uri': 'log/201006-17-02-53-fledge-south-random-install.log'}, - {'id': '1cd38675-fea8-4783-b3b5-463ed6c8cbe8', 'name': 'fledge-north-gcp', + {'id': '1cd38675-fea8-4783-b3b5-463ed6c8cbe8', 'name': 'fledge-north-kafka', 'action': "install", 'status': 0, - 'log_file_uri': 'log/201007-01-02-53-fledge-north-gcp-install.log'}, + 'log_file_uri': 'log/201007-01-02-53-fledge-north-kafka-install.log'}, {'id': '63f3c84b-0cbf-4c76-b9bf-848779fbcc6f', 'name': 'fledge-filter-fft', 'action': "update", 'status': 127, 'log_file_uri': 'log/201006-12-02-12-fledge-filter-fft-update.log'},