diff --git a/ChangeLog.md b/ChangeLog.md index f28d4a38e..e378f344e 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -13,6 +13,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed ### Added +- Added support for caching readings in RAM and sending in batches. +This currently only works on the EnviroDIY/Monitor My Watershed Publisher. +Thank you to [Thomas Watson](https://github.com/tpwrules) for this work. ### Removed diff --git a/src/LogBuffer.cpp b/src/LogBuffer.cpp new file mode 100644 index 000000000..ca4eab926 --- /dev/null +++ b/src/LogBuffer.cpp @@ -0,0 +1,102 @@ +/** + * @file LogBuffer.cpp + * @copyright 2023 Thomas Watson + * Part of the EnviroDIY ModularSensors library for Arduino + * @author Thomas Watson + * + * @brief Implements the LogBuffer class. + * + * This class buffers logged timestamps and variable values for transmission. + */ +#include "LogBuffer.h" + +#include + +// Constructor +LogBuffer::LogBuffer() {} +// Destructor +LogBuffer::~LogBuffer() {} + +void LogBuffer::setNumVariables(uint8_t numVariables_) { + // each record is one uint32_t to hold the timestamp, plus N floats to hold + // each variable's value + recordSize = sizeof(uint32_t) + sizeof(float) * numVariables_; + numVariables = numVariables_; + + // this scrambles all the data in the buffer so clear it out + clear(); +} + +void LogBuffer::clear(void) { + // clear out the buffer + numRecords = 0; + dataBufferTail = 0; + dataBufferHead = 0; + _bufferOverflow = false; +} + +uint8_t LogBuffer::getNumVariables(void) { + return numVariables; +} + +int LogBuffer::getNumRecords(void) { + return numRecords; +} + +uint8_t LogBuffer::getPercentFull(void) { + uint32_t bytesFull = (uint32_t)numRecords * (uint32_t)recordSize; + uint32_t bytesTotal = MS_LOG_DATA_BUFFER_SIZE; + + return (uint8_t)((bytesFull * (uint32_t)100) / bytesTotal); +} + +int LogBuffer::addRecord(uint32_t timestamp) { + // check how many records currently exist + int record = numRecords; + // compute position of the new record's timestamp in the buffer + // (the timestamp is the first data in the record) + size_t pos = record * recordSize; + // verify we have sufficient space for the record and bail if not + if (MS_LOG_DATA_BUFFER_SIZE - pos < recordSize) { return -1; } + + // write the timestamp to the record + memcpy(static_cast(&dataBuffer[pos]), static_cast(×tamp), + sizeof(uint32_t)); + numRecords += 1; // just added another record + + // return the index of the record number just created + return record; +} + +void LogBuffer::setRecordValue(int record, uint8_t variable, float value) { + // compute position of this value in the buffer + size_t pos = record * recordSize + sizeof(uint32_t) + + variable * sizeof(float); + + // write the value to the record + memcpy(static_cast(&dataBuffer[pos]), static_cast(&value), + sizeof(float)); +} + +uint32_t LogBuffer::getRecordTimestamp(int record) { + // read the timestamp from the record (which is the first data in it) + uint32_t timestamp; + memcpy(static_cast(×tamp), + static_cast(&dataBuffer[record * recordSize]), + sizeof(uint32_t)); + + return timestamp; +} + +float LogBuffer::getRecordValue(int record, uint8_t variable) { + // compute position of this value in the buffer + size_t pos = record * recordSize + sizeof(uint32_t) + + variable * sizeof(float); + + // read the value from the record + float value; + memcpy(static_cast(&value), static_cast(&dataBuffer[pos]), + sizeof(float)); + + return value; +} diff --git a/src/LogBuffer.h b/src/LogBuffer.h new file mode 100644 index 000000000..c648dfd3e --- /dev/null +++ b/src/LogBuffer.h @@ -0,0 +1,161 @@ +/** + * @file LogBuffer.cpp + * @copyright 2023 Thomas Watson + * Part of the EnviroDIY ModularSensors library for Arduino + * @author Thomas Watson + * + * @brief Implements the LogBuffer class. + * + * This class buffers logged timestamps and variable values for transmission. + */ + +// Header Guards +#ifndef SRC_LOGBUFFER_H_ +#define SRC_LOGBUFFER_H_ + +#ifndef MS_LOG_DATA_BUFFER_SIZE +#ifdef ARDUINO_AVR_MEGA2560 +#define MS_LOG_DATA_BUFFER_SIZE 2048 +#else +/** + * @brief Log Data Buffer + * + * This determines how much RAM is reserved to buffer log records before + * transmission. Each record consumes 4 bytes for the timestamp plus 4 bytes + * for each logged variable. Increasing this value too far can crash the + * device! The number of log records buffered is controlled by sendEveryX. + * + * This can be changed by setting the build flag MS_LOG_DATA_BUFFER_SIZE when + * compiling. 8192 bytes is a safe value for the Mayfly 1.1 with six variables. + */ +#define MS_LOG_DATA_BUFFER_SIZE 8192 +#endif +#endif + +#include +#include + +/** + * @brief This class buffers logged timestamps and variable values for + * transmission. The log is divided into a number of records. Each record + * stores the timestamp of the record as a uint32_t, then the value of each + * variable as a float at that time. + */ +class LogBuffer { + public: + /** + * @brief Constructs a new empty buffer which stores no variables or values. + */ + LogBuffer(); + /** + * @brief Destroys the buffer. + */ + virtual ~LogBuffer(); + + /** + * @brief Sets the number of variables the buffer will store in each record. + * Clears the buffer as a side effect. + * + * @param numVariables_ The number of variables to store. + */ + void setNumVariables(uint8_t numVariables_); + + /** + * @brief Gets the number of variables that will be stored in each record. + * + * @return The variable count. + */ + uint8_t getNumVariables(void); + + /** + * @brief Clears all records from the log. + */ + void clear(void); + + /** + * @brief Gets the number of records currently in the log. + * + * @return The number of records. + */ + int getNumRecords(void); + + /** + * @brief Computes the percentage full of the buffer. + * + * @return The current percent full. + */ + uint8_t getPercentFull(void); + + /** + * @brief Adds a new record with the given timestamp. + * + * @param timestamp The timestamp + * + * @return Index of the new record, or -1 if there was no space. + */ + int addRecord(uint32_t timestamp); + + /** + * @brief Sets the value of a particular variable in a particular record. + * + * @param record The record + * @param variable The variable + * @param value The value + */ + void setRecordValue(int record, uint8_t variable, float value); + + /** + * @brief Gets the timestamp of a particular record. + * + * @param record The record + * + * @return The record's timestamp. + */ + uint32_t getRecordTimestamp(int record); + + /** + * @brief Gets the value of a particular variable in a particular record. + * + * @param record The record + * @param variable The variable + * + * @return The variable's value. + */ + float getRecordValue(int record, uint8_t variable); + + protected: + /** + * @brief Buffer which stores the log data. + */ + uint8_t dataBuffer[MS_LOG_DATA_BUFFER_SIZE]; + + /** + * @brief Index of buffer head. + */ + uint16_t dataBufferTail; + /** + * @brief Index of buffer tail. + */ + uint16_t dataBufferHead; + /** + * @brief The buffer overflow status + */ + bool _bufferOverflow = false; + + /** + * @brief Number of records currently in the buffer. + */ + int numRecords; + + /** + * @brief Size in bytes of each record in the buffer. + */ + size_t recordSize; + + /** + * @brief Number of variables stored in each record in the buffer. + */ + uint8_t numVariables; +}; + +#endif // SRC_LOGBUFFER_H_ diff --git a/src/LoggerBase.cpp b/src/LoggerBase.cpp index 7b36fd4f1..a556a2bc7 100644 --- a/src/LoggerBase.cpp +++ b/src/LoggerBase.cpp @@ -265,11 +265,20 @@ String Logger::getVarCodeAtI(uint8_t position_i) { String Logger::getVarUUIDAtI(uint8_t position_i) { return _internalArray->arrayOfVars[position_i]->getVarUUID(); } +// This returns the current value of the variable as a float +float Logger::getValueAtI(uint8_t position_i) { + return _internalArray->arrayOfVars[position_i]->getValue(); +} // This returns the current value of the variable as a string with the // correct number of significant figures String Logger::getValueStringAtI(uint8_t position_i) { return _internalArray->arrayOfVars[position_i]->getValueString(); } +// This returns a particular value of the variable as a string with the +// correct number of significant figures +String Logger::formatValueStringAtI(uint8_t position_i, float value) { + return _internalArray->arrayOfVars[position_i]->formatValueString(value); +} // ===================================================================== // @@ -330,15 +339,27 @@ void Logger::registerDataPublisher(dataPublisher* publisher) { dataPublishers[i] = publisher; } +bool Logger::checkRemotesConnectionNeeded(void) { + MS_DBG(F("Asking publishers if they need a connection.")); -void Logger::publishDataToRemotes(void) { + bool needed = false; + for (uint8_t i = 0; i < MAX_NUMBER_SENDERS; i++) { + if (dataPublishers[i] != nullptr) { + needed = needed || dataPublishers[i]->connectionNeeded(); + } + } + + return needed; +} + +void Logger::publishDataToRemotes(bool forceFlush) { MS_DBG(F("Sending out remote data.")); for (uint8_t i = 0; i < MAX_NUMBER_SENDERS; i++) { if (dataPublishers[i] != nullptr) { PRINTOUT(F("\nSending data to ["), i, F("]"), dataPublishers[i]->getEndpoint()); - dataPublishers[i]->publishData(); + dataPublishers[i]->publishData(forceFlush); watchDogTimer.resetWatchDog(); } } @@ -579,13 +600,20 @@ void Logger::markTime(void) { bool Logger::checkInterval(void) { bool retval; uint32_t checkTime = getNowLocalEpoch(); + uint16_t interval = _loggingIntervalMinutes; + if (_initialShortIntervals > 0) { + // log the first few samples at an interval of 1 minute so that + // operation can be quickly verified in the field + _initialShortIntervals -= 1; + interval = 1; + } + MS_DBG(F("Current Unix Timestamp:"), checkTime, F("->"), formatDateTime_ISO8601(checkTime)); - MS_DBG(F("Logging interval in seconds:"), (_loggingIntervalMinutes * 60)); - MS_DBG(F("Mod of Logging Interval:"), - checkTime % (_loggingIntervalMinutes * 60)); + MS_DBG(F("Logging interval in seconds:"), (interval * 60)); + MS_DBG(F("Mod of Logging Interval:"), checkTime % (interval * 60)); - if (checkTime % (_loggingIntervalMinutes * 60) == 0) { + if ((checkTime % (interval * 60) == 0)) { // Update the time variables with the current time markTime(); MS_DBG(F("Time marked at (unix):"), Logger::markedLocalEpochTime); @@ -1455,6 +1483,14 @@ void Logger::begin() { PRINTOUT(F("Sampling feature UUID is:"), _samplingFeatureUUID); } + + for (uint8_t i = 0; i < MAX_NUMBER_SENDERS; i++) { + if (dataPublishers[i] != nullptr) { + PRINTOUT(F("Data will be published to ["), i, F("]"), + dataPublishers[i]->getEndpoint()); + } + } + PRINTOUT(F("Logger portion of setup finished.\n")); } @@ -1465,10 +1501,12 @@ void Logger::logData(bool sleepBeforeReturning) { watchDogTimer.resetWatchDog(); // Assuming we were woken up by the clock, check if the current time is an - // even interval of the logging interval + // even interval of the logging interval or that we have been specifically + // requested to log by pushbutton if (checkInterval()) { // Flag to notify that we're in already awake and logging a point Logger::isLoggingNow = true; + // Reset the watchdog watchDogTimer.resetWatchDog(); @@ -1499,6 +1537,8 @@ void Logger::logData(bool sleepBeforeReturning) { // Unset flag Logger::isLoggingNow = false; + // Acknowledge testing button if pressed + Logger::startTesting = false; } // Check if it was instead the testing interrupt that woke us up @@ -1515,7 +1555,8 @@ void Logger::logDataAndPublish(bool sleepBeforeReturning) { watchDogTimer.resetWatchDog(); // Assuming we were woken up by the clock, check if the current time is an - // even interval of the logging interval + // even interval of the logging interval or that we have been specifically + // requested to log by pushbutton if (checkInterval()) { // Flag to notify that we're in already awake and logging a point Logger::isLoggingNow = true; @@ -1551,7 +1592,19 @@ void Logger::logDataAndPublish(bool sleepBeforeReturning) { // Create a csv data record and save it to the log file logToSD(); - if (_logModem != nullptr) { + // flush the publisher buffers (if any) if we have been invoked by the + // testing button + bool forceFlush = Logger::startTesting; + + // Sync the clock at noon + bool clockSyncNeeded = + (Logger::markedLocalEpochTime != 0 && + Logger::markedLocalEpochTime % 86400 == 43200) || + !isRTCSane(Logger::markedLocalEpochTime); + bool connectionNeeded = checkRemotesConnectionNeeded() || + clockSyncNeeded || forceFlush; + + if (_logModem != nullptr && connectionNeeded) { MS_DBG(F("Waking up"), _logModem->getModemName(), F("...")); if (_logModem->modemWake()) { // Connect to the network @@ -1560,13 +1613,10 @@ void Logger::logDataAndPublish(bool sleepBeforeReturning) { if (_logModem->connectInternet()) { // Publish data to remotes watchDogTimer.resetWatchDog(); - publishDataToRemotes(); + publishDataToRemotes(forceFlush); watchDogTimer.resetWatchDog(); - if ((Logger::markedLocalEpochTime != 0 && - Logger::markedLocalEpochTime % 86400 == 43200) || - !isRTCSane(Logger::markedLocalEpochTime)) { - // Sync the clock at noon + if (clockSyncNeeded) { MS_DBG(F("Running a daily clock sync...")); setRTClock(_logModem->getNISTTime()); watchDogTimer.resetWatchDog(); @@ -1586,6 +1636,12 @@ void Logger::logDataAndPublish(bool sleepBeforeReturning) { } // Turn the modem off _logModem->modemSleepPowerDown(); + } else if (_logModem != nullptr) { + MS_DBG(F("Nobody needs it so publishing without connecting...")); + // Call publish function without connection + watchDogTimer.resetWatchDog(); + publishDataToRemotes(false); // can't flush without a connection + watchDogTimer.resetWatchDog(); } @@ -1603,13 +1659,15 @@ void Logger::logDataAndPublish(bool sleepBeforeReturning) { // Unset flag Logger::isLoggingNow = false; + // Acknowledge testing button if pressed + Logger::startTesting = false; } // Check if it was instead the testing interrupt that woke us up - if (Logger::startTesting) testingMode(sleepBeforeReturning); + if (Logger::startTesting) testingMode(); if (sleepBeforeReturning) { - // Sleep + // Call the processor sleep systemSleep(); } } diff --git a/src/LoggerBase.h b/src/LoggerBase.h index 8940a282b..b293fcf17 100644 --- a/src/LoggerBase.h +++ b/src/LoggerBase.h @@ -399,6 +399,11 @@ class Logger { * @brief The logging interval in minutes */ uint16_t _loggingIntervalMinutes = 5; + /** + * @brief The initial number of samples to log at an interval of 1 minute + * for fast field verification + */ + uint8_t _initialShortIntervals = 0; /** * @brief Digital pin number on the mcu controlling the SD card slave * select. @@ -526,6 +531,14 @@ class Logger { * @return **String** The variable UUID */ String getVarUUIDAtI(uint8_t position_i); + /** + * @brief Get the most recent value of the variable at the given position in + * the internal variable array object. + * + * @param position_i The position of the variable in the array. + * @return **float** The value of the variable as a float. + */ + float getValueAtI(uint8_t position_i); /** * @brief Get the most recent value of the variable at the given position in * the internal variable array object. @@ -535,6 +548,16 @@ class Logger { * number of significant figures. */ String getValueStringAtI(uint8_t position_i); + /** + * @brief Get the string representing a particular value of the variable at + * the given position in the internal variable array object. + * + * @param position_i The position of the variable in the array. + * @param value The value to format. + * @return **String** The given value as a string with the correct number of + * significant figures. + */ + String formatValueStringAtI(uint8_t position_i, float value); protected: /** @@ -579,10 +602,19 @@ class Logger { * @param publisher A dataPublisher object */ void registerDataPublisher(dataPublisher* publisher); + /** + * @brief Check if any data publishers need an Internet connection for the + * next publish call. + * + * @return True if any remotes need a connection. + */ + bool checkRemotesConnectionNeeded(void); /** * @brief Publish data to all registered data publishers. + * + * @param forceFlush Ask the publishers to flush buffered data immediately. */ - void publishDataToRemotes(void); + void publishDataToRemotes(bool forceFlush = false); /** * @brief Retained for backwards compatibility, use publishDataToRemotes() * in new code. @@ -1187,7 +1219,7 @@ class Logger { * measurements, the sensors are put to sleep, the modem is disconnected * from the internet, and the logger goes back to sleep. */ - virtual void testingMode(bool sleepBeforeReturning = true); + virtual void testingMode(); /**@}*/ // ===================================================================== // diff --git a/src/VariableBase.cpp b/src/VariableBase.cpp index 4fc8cc0ab..977e7e143 100644 --- a/src/VariableBase.cpp +++ b/src/VariableBase.cpp @@ -255,11 +255,18 @@ float Variable::getValue(bool updateValue) { // This returns the current value of the variable as a string // with the correct number of significant figures String Variable::getValueString(bool updateValue) { + return formatValueString(getValue(updateValue)); +} + + +// This returns a particular value of the variable as a string +// with the correct number of significant figures +String Variable::formatValueString(float value) { // Need this because otherwise get extra spaces in strings from int if (_decimalResolution == 0) { - auto val = static_cast(getValue(updateValue)); + auto val = static_cast(value); return String(val); } else { - return String(getValue(updateValue), _decimalResolution); + return String(value, _decimalResolution); } } diff --git a/src/VariableBase.h b/src/VariableBase.h index 35c413acd..b72c5fb19 100644 --- a/src/VariableBase.h +++ b/src/VariableBase.h @@ -356,6 +356,14 @@ class Variable { * @return **String** The current value of the variable */ String getValueString(bool updateValue = false); + /** + * @brief Get a particular value of the variable as a string with the + * correct decimal resolution + * + * @param value value to format + * @return **String** The formatted value of the variable + */ + String formatValueString(float value); /** * @brief Pointer to the parent sensor diff --git a/src/dataPublisherBase.cpp b/src/dataPublisherBase.cpp index 5429a696f..0c5a0407c 100644 --- a/src/dataPublisherBase.cpp +++ b/src/dataPublisherBase.cpp @@ -110,26 +110,60 @@ void dataPublisher::txBufferAppend(char c) { } void dataPublisher::txBufferFlush() { + if ((txBufferOutClient == nullptr) || (txBufferLen == 0)) { + // sending into the void... + txBufferLen = 0; + return; + } + #if defined(STANDARD_SERIAL_OUTPUT) // write out to the printout stream STANDARD_SERIAL_OUTPUT.write((const uint8_t*)txBuffer, txBufferLen); STANDARD_SERIAL_OUTPUT.flush(); #endif - // write out to the client - txBufferOutClient->write((const uint8_t*)txBuffer, txBufferLen); - txBufferOutClient->flush(); - txBufferLen = 0; + uint8_t tries = 10; + const uint8_t* ptr = (const uint8_t*)txBuffer; + while (true) { + size_t sent = txBufferOutClient->write(ptr, txBufferLen); + txBufferLen -= sent; + ptr += sent; + if (txBufferLen == 0) { + // whole message is successfully sent, we are done + txBufferOutClient->flush(); + return; + } + +#if defined(STANDARD_SERIAL_OUTPUT) + // warn that we only partially sent the buffer + STANDARD_SERIAL_OUTPUT.write('!'); +#endif + if (--tries == 0) { + // can't convince the modem to send the whole message. just break + // the connection now so it will get reset and we can try to + // transmit the data again later + txBufferOutClient = nullptr; + txBufferLen = 0; + return; + } + + // give the modem a chance to transmit buffered data + delay(1000); + } } +bool dataPublisher::connectionNeeded(void) { + // connection is always needed unless publisher has special logic + return true; +} // This sends data on the "default" client of the modem -int16_t dataPublisher::publishData() { +int16_t dataPublisher::publishData(bool forceFlush) { if (_inClient == nullptr) { PRINTOUT(F("ERROR! No web client assigned to publish data!")); return 0; } else { - return publishData(_inClient); + return publishData(_inClient, forceFlush); } } // Duplicates for backwards compatibility diff --git a/src/dataPublisherBase.h b/src/dataPublisherBase.h index 6ccefa91d..0f3058305 100644 --- a/src/dataPublisherBase.h +++ b/src/dataPublisherBase.h @@ -171,6 +171,14 @@ class dataPublisher { virtual String getEndpoint(void) = 0; + /** + * @brief Checks if the publisher needs an Internet connection for the next + * publishData call (as opposed to just buffering data internally). + * + * @return True if an internet connection is needed for the next publish. + */ + virtual bool connectionNeeded(void); + /** * @brief Open a socket to the correct receiver and sends out the formatted * data. @@ -181,10 +189,11 @@ class dataPublisher { * @param outClient An Arduino client instance to use to print data to. * Allows the use of any type of client and multiple clients tied to a * single TinyGSM modem instance + * @param forceFlush Ask the publisher to flush buffered data immediately. * @return **int16_t** The result of publishing data. May be an http * response code or a result code from PubSubClient. */ - virtual int16_t publishData(Client* outClient) = 0; + virtual int16_t publishData(Client* outClient, bool forceFlush = false) = 0; /** * @brief Open a socket to the correct receiver and send out the formatted * data. @@ -193,10 +202,12 @@ class dataPublisher { * either a client having been linked to the publisher or a logger modem * having been linked to the logger linked to the publisher. * + * @param forceFlush Ask the publisher to flush buffered data immediately. + * * @return **int16_t** The result of publishing data. May be an http * response code or a result code from PubSubClient. */ - virtual int16_t publishData(); + virtual int16_t publishData(bool forceFlush = false); /** * @brief Retained for backwards compatibility; use publishData(Client* diff --git a/src/publishers/DreamHostPublisher.cpp b/src/publishers/DreamHostPublisher.cpp index 40ffad715..9089028ac 100644 --- a/src/publishers/DreamHostPublisher.cpp +++ b/src/publishers/DreamHostPublisher.cpp @@ -65,7 +65,7 @@ void DreamHostPublisher::begin(Logger& baseLogger, const char* dhUrl) { // Post the data to dream host. // int16_t DreamHostPublisher::postDataDreamHost(void) -int16_t DreamHostPublisher::publishData(Client* outClient) { +int16_t DreamHostPublisher::publishData(Client* outClient, bool) { // Create a buffer for the portions of the request and response char tempBuffer[37] = ""; uint16_t did_respond = 0; diff --git a/src/publishers/DreamHostPublisher.h b/src/publishers/DreamHostPublisher.h index ba88e4d11..34a5690ed 100644 --- a/src/publishers/DreamHostPublisher.h +++ b/src/publishers/DreamHostPublisher.h @@ -131,10 +131,11 @@ class DreamHostPublisher : public dataPublisher { * @param outClient An Arduino client instance to use to print data to. * Allows the use of any type of client and multiple clients tied to a * single TinyGSM modem instance + * @param forceFlush Ask the publisher to flush buffered data immediately. * * @return **int16_t** The http status code of the response. */ - int16_t publishData(Client* outClient) override; + int16_t publishData(Client* outClient, bool forceFlush = false) override; protected: // portions of the GET request diff --git a/src/publishers/EnviroDIYPublisher.cpp b/src/publishers/EnviroDIYPublisher.cpp index 5a6a043d8..457c8f3a4 100644 --- a/src/publishers/EnviroDIYPublisher.cpp +++ b/src/publishers/EnviroDIYPublisher.cpp @@ -4,6 +4,7 @@ * Part of the EnviroDIY ModularSensors library for Arduino. * This library is published under the BSD-3 license. * @author Sara Geleskie Damiano + * @author Thomas Watson * * @brief Implements the EnviroDIYPublisher class. */ @@ -18,25 +19,36 @@ // Constant values for post requests // I want to refer to these more than once while ensuring there is only one copy // in memory -const char* EnviroDIYPublisher::postEndpoint = "/api/data-stream/"; -const char* EnviroDIYPublisher::enviroDIYHost = "data.envirodiy.org"; -const int EnviroDIYPublisher::enviroDIYPort = 80; const char* EnviroDIYPublisher::tokenHeader = "\r\nTOKEN: "; const char* EnviroDIYPublisher::contentLengthHeader = "\r\nContent-Length: "; const char* EnviroDIYPublisher::contentTypeHeader = "\r\nContent-Type: application/json\r\n\r\n"; const char* EnviroDIYPublisher::samplingFeatureTag = "{\"sampling_feature\":\""; -const char* EnviroDIYPublisher::timestampTag = "\",\"timestamp\":\""; +const char* EnviroDIYPublisher::timestampTag = "\",\"timestamp\":["; // Constructors -EnviroDIYPublisher::EnviroDIYPublisher() : dataPublisher() {} +EnviroDIYPublisher::EnviroDIYPublisher() : dataPublisher() { + setHost("monitormywatershed.org"); + setPath("/api/data-stream/"); + setPort(80); +} EnviroDIYPublisher::EnviroDIYPublisher(Logger& baseLogger, int sendEveryX) - : dataPublisher(baseLogger, sendEveryX) {} + : dataPublisher(baseLogger, sendEveryX) { + _logBuffer.setNumVariables(_baseLogger->getArrayVarCount()); + setHost("monitormywatershed.org"); + setPath("/api/data-stream/"); + setPort(80); +} EnviroDIYPublisher::EnviroDIYPublisher(Logger& baseLogger, Client* inClient, int sendEveryX) - : dataPublisher(baseLogger, inClient, sendEveryX) {} + : dataPublisher(baseLogger, inClient, sendEveryX) { + _logBuffer.setNumVariables(_baseLogger->getArrayVarCount()); + setHost("monitormywatershed.org"); + setPath("/api/data-stream/"); + setPort(80); +} EnviroDIYPublisher::EnviroDIYPublisher(Logger& baseLogger, const char* registrationToken, const char* samplingFeatureUUID, @@ -44,6 +56,10 @@ EnviroDIYPublisher::EnviroDIYPublisher(Logger& baseLogger, : dataPublisher(baseLogger, sendEveryX) { setToken(registrationToken); _baseLogger->setSamplingFeatureUUID(samplingFeatureUUID); + _logBuffer.setNumVariables(_baseLogger->getArrayVarCount()); + setHost("monitormywatershed.org"); + setPath("/api/data-stream/"); + setPort(80); } EnviroDIYPublisher::EnviroDIYPublisher(Logger& baseLogger, Client* inClient, const char* registrationToken, @@ -52,11 +68,46 @@ EnviroDIYPublisher::EnviroDIYPublisher(Logger& baseLogger, Client* inClient, : dataPublisher(baseLogger, inClient, sendEveryX) { setToken(registrationToken); _baseLogger->setSamplingFeatureUUID(samplingFeatureUUID); + _logBuffer.setNumVariables(_baseLogger->getArrayVarCount()); + setHost("monitormywatershed.org"); + setPath("/api/data-stream/"); + setPort(80); } // Destructor EnviroDIYPublisher::~EnviroDIYPublisher() {} +// Returns the data destination +String EnviroDIYPublisher::getHost(void) { + return String(enviroDIYHost); +} + +// Returns the data destination +void EnviroDIYPublisher::setHost(const char* host) { + enviroDIYHost = host; +} + +// Returns the data destination +String EnviroDIYPublisher::getPath(void) { + return String(enviroDIYPath); +} + +// Returns the data destination +void EnviroDIYPublisher::setPath(const char* endpoint) { + enviroDIYPath = endpoint; +} + +// Returns the data destination +int EnviroDIYPublisher::getPort(void) { + return enviroDIYPort; +} + +// Returns the data destination +void EnviroDIYPublisher::setPort(int port) { + enviroDIYPort = port; +} + + void EnviroDIYPublisher::setToken(const char* registrationToken) { _registrationToken = registrationToken; } @@ -64,21 +115,39 @@ void EnviroDIYPublisher::setToken(const char* registrationToken) { // Calculates how long the JSON will be uint16_t EnviroDIYPublisher::calculateJsonSize() { - uint16_t jsonLength = 21; // {"sampling_feature":" - jsonLength += 36; // sampling feature UUID - jsonLength += 15; // ","timestamp":" - jsonLength += 25; // markedISO8601Time - jsonLength += 2; // ", - for (uint8_t i = 0; i < _baseLogger->getArrayVarCount(); i++) { + uint8_t variables = _logBuffer.getNumVariables(); + int records = _logBuffer.getNumRecords(); + MS_DBG(F("Number of records in log buffer:"), records); + MS_DBG(F("Number of variables in log buffer:"), variables); + MS_DBG(F("Number of variables in base logger:"), + _baseLogger->getArrayVarCount()); + + uint16_t jsonLength = strlen(samplingFeatureTag); + jsonLength += 36; // sampling feature UUID + jsonLength += 36; // sampling feature UUID + jsonLength += strlen(timestampTag); + // markedISO8601Time + quotes and commas + jsonLength += records * (25 + 2) + records - 1; + jsonLength += 2; // ], + for (uint8_t var = 0; var < variables; var++) { jsonLength += 1; // " jsonLength += 36; // variable UUID - jsonLength += 2; // ": - jsonLength += _baseLogger->getValueStringAtI(i).length(); - if (i + 1 != _baseLogger->getArrayVarCount()) { + jsonLength += 4; // ":[] + + for (int rec = 0; rec < records; rec++) { + float value = _logBuffer.getRecordValue(rec, var); + jsonLength += + _baseLogger->formatValueStringAtI(var, value).length(); + if (rec + 1 != records) { + jsonLength += 1; // , + } + } + if (var + 1 != variables) { jsonLength += 1; // , } } jsonLength += 1; // } + MS_DBG(F("Outgoing JSON size:"), jsonLength); return jsonLength; } @@ -91,6 +160,7 @@ void EnviroDIYPublisher::begin(Logger& baseLogger, Client* inClient, setToken(registrationToken); dataPublisher::begin(baseLogger, inClient); _baseLogger->setSamplingFeatureUUID(samplingFeatureUUID); + _logBuffer.setNumVariables(_baseLogger->getArrayVarCount()); } void EnviroDIYPublisher::begin(Logger& baseLogger, const char* registrationToken, @@ -98,20 +168,92 @@ void EnviroDIYPublisher::begin(Logger& baseLogger, setToken(registrationToken); dataPublisher::begin(baseLogger); _baseLogger->setSamplingFeatureUUID(samplingFeatureUUID); + _logBuffer.setNumVariables(_baseLogger->getArrayVarCount()); } +bool EnviroDIYPublisher::connectionNeeded(void) { + // compute the send interval, reducing it as the buffer gets more full so we + // have less of a chance of losing data + int interval = _sendEveryX; + uint8_t percent = _logBuffer.getPercentFull(); + MS_DBG(F("Buffer is"), percent, F("percent full")); + if (percent >= 50) { + interval /= 2; + } else if (percent >= 75) { + interval /= 4; + } else if (percent >= 90) { + interval = 1; + } + + // the programmed interval is about to be reached by the next record, or it + // was just reached and we are trying again + bool atSendInterval = false; + if (interval <= 1) { + atSendInterval = true; + } else { + int numRecords = _logBuffer.getNumRecords(); + // where we are relative to the interval + int relative = (numRecords % interval); + if (relative == (interval - 1)) { + // the next sample will put us right at the interval + atSendInterval = true; + } else if (numRecords >= interval) { // don't send the first sample + if (relative == 0) { + // the last sample was the interval, this is the first retry + atSendInterval = true; + } else if (relative == 1) { + // two samples ago was the interval, this is the second retry + atSendInterval = true; + } + } + } + + // the initial log transmissions have not completed (we send every one + // of the first five data points immediately for field validation) + bool initialTransmission = _initialTransmissionsRemaining > 0; + + return atSendInterval || initialTransmission; +} // This utilizes an attached modem to make a TCP connection to the // EnviroDIY/ODM2DataSharingPortal and then streams out a post request // over that connection. // The return is the http status code of the response. -int16_t EnviroDIYPublisher::publishData(Client* outClient) { +int16_t EnviroDIYPublisher::publishData(Client* outClient, bool forceFlush) { + // do we intend to flush this call? if so, we have just returned true from + // connectionNeeded() and the internet is connected and waiting. check what + // that function said so we know to do it after we record this data point. + // we also flush if requested (in which case the internet is connected too) + bool willFlush = connectionNeeded() || forceFlush; + MS_DBG(F("Publishing record to buffer. Will flush:"), willFlush); + + // create record to hold timestamp and variable values in the log buffer + int record = _logBuffer.addRecord(Logger::markedLocalEpochTime); + + // write record data if the record was successfully created + if (record >= 0) { + for (uint8_t i = 0; i < _baseLogger->getArrayVarCount(); i++) { + _logBuffer.setRecordValue(record, i, _baseLogger->getValueAtI(i)); + } + } + + if (_initialTransmissionsRemaining > 0) { + _initialTransmissionsRemaining -= 1; + } + + // do the data buffer flushing if we previously planned to + if (willFlush) { + return flushDataBuffer(outClient); + } else { + return 201; // pretend everything went okay? + } +} + +int16_t EnviroDIYPublisher::flushDataBuffer(Client* outClient) { // Create a buffer for the portions of the request and response char tempBuffer[37] = ""; uint16_t did_respond = 0; - MS_DBG(F("Outgoing JSON size:"), calculateJsonSize()); - // Open a TCP/IP connection to the Enviro DIY Data Portal (WebSDL) MS_DBG(F("Connecting client")); MS_START_DEBUG_TIMER; @@ -121,7 +263,7 @@ int16_t EnviroDIYPublisher::publishData(Client* outClient) { // copy the initial post header into the tx buffer txBufferAppend(postHeader); - txBufferAppend(postEndpoint); + txBufferAppend(enviroDIYPath); txBufferAppend(HTTPtag); // add the rest of the HTTP POST headers to the outgoing buffer @@ -141,19 +283,37 @@ int16_t EnviroDIYPublisher::publishData(Client* outClient) { txBufferAppend(_baseLogger->getSamplingFeatureUUID()); txBufferAppend(timestampTag); - txBufferAppend( - Logger::formatDateTime_ISO8601(Logger::markedLocalEpochTime) - .c_str()); - txBufferAppend('"'); + + // write out list of timestamps + int records = _logBuffer.getNumRecords(); + for (int rec = 0; rec < records; rec++) { + txBufferAppend('"'); + uint32_t timestamp = _logBuffer.getRecordTimestamp(rec); + txBufferAppend(Logger::formatDateTime_ISO8601(timestamp).c_str()); + txBufferAppend('"'); + if (rec + 1 != records) { txBufferAppend(','); } + } + txBufferAppend(']'); txBufferAppend(','); - for (uint8_t i = 0; i < _baseLogger->getArrayVarCount(); i++) { + // write out a list of the values of each variable + uint8_t variables = _logBuffer.getNumVariables(); + for (uint8_t var = 0; var < variables; var++) { txBufferAppend('"'); - txBufferAppend(_baseLogger->getVarUUIDAtI(i).c_str()); + txBufferAppend(_baseLogger->getVarUUIDAtI(var).c_str()); txBufferAppend('"'); txBufferAppend(':'); - txBufferAppend(_baseLogger->getValueStringAtI(i).c_str()); - if (i + 1 != _baseLogger->getArrayVarCount()) { + txBufferAppend('['); + + for (int rec = 0; rec < records; rec++) { + float value = _logBuffer.getRecordValue(rec, var); + txBufferAppend( + _baseLogger->formatValueStringAtI(var, value).c_str()); + if (rec + 1 != records) { txBufferAppend(','); } + } + txBufferAppend(']'); + + if (var + 1 != variables) { txBufferAppend(','); } else { txBufferAppend('}'); @@ -200,5 +360,10 @@ int16_t EnviroDIYPublisher::publishData(Client* outClient) { PRINTOUT(F("\n-- Response Code --")); PRINTOUT(responseCode); + if (responseCode == 201) { + // data was successfully transmitted, we can discard it from the buffer + _logBuffer.clear(); + } + return responseCode; } diff --git a/src/publishers/EnviroDIYPublisher.h b/src/publishers/EnviroDIYPublisher.h index 025ece3df..c54291253 100644 --- a/src/publishers/EnviroDIYPublisher.h +++ b/src/publishers/EnviroDIYPublisher.h @@ -4,6 +4,7 @@ * Part of the EnviroDIY ModularSensors library for Arduino. * This library is published under the BSD-3 license. * @author Sara Geleskie Damiano + * @author Thomas Watson * * @brief Contains the EnviroDIYPublisher subclass of dataPublisher for * publishing data to the Monitor My Watershed/EnviroDIY data portal at @@ -25,6 +26,7 @@ #include "ModSensorDebugger.h" #undef MS_DEBUGGING_STD #include "dataPublisherBase.h" +#include "LogBuffer.h" // ============================================================================ @@ -105,9 +107,49 @@ class EnviroDIYPublisher : public dataPublisher { // Returns the data destination String getEndpoint(void) override { - return String(enviroDIYHost); + return String(enviroDIYHost) + String(enviroDIYPath); } + /** + * @brief Get the EnviroDIY/Monitor My Watershed web host + * + * @return *String* The EnviroDIY/Monitor My Watershed web host + */ + String getHost(void); + + /** + * @brief Set the EnviroDIY/Monitor My Watershed web host + * + * @param host The EnviroDIY/Monitor My Watershed web host + */ + void setHost(const char* host); + + /** + * @brief Get the EnviroDIY/Monitor My Watershed API path + * + * @return *String* The EnviroDIY/Monitor My Watershed API path + */ + String getPath(void); + /** + * @brief Set the EnviroDIY/Monitor My Watershed API path + * + * @param endpoint The EnviroDIY/Monitor My Watershed API path + */ + void setPath(const char* endpoint); + + /** + * @brief Get the EnviroDIY/Monitor My Watershed API port + * + * @return *int* The EnviroDIY/Monitor My Watershed API port + */ + int getPort(void); + /** + * @brief Set the EnviroDIY/Monitor My Watershed API port + * + * @param port The EnviroDIY/Monitor My Watershed API port + */ + void setPort(int port); + // Adds the site registration token /** * @brief Set the site registration token @@ -145,6 +187,14 @@ class EnviroDIYPublisher : public dataPublisher { void begin(Logger& baseLogger, const char* registrationToken, const char* samplingFeatureUUID); + /** + * @brief Checks if the publisher needs an Internet connection for the next + * publishData call (as opposed to just buffering data internally). + * + * @return True if an internet connection is needed for the next publish. + */ + bool connectionNeeded(void) override; + /** * @brief Utilize an attached modem to open a a TCP connection to the * EnviroDIY/ODM2DataSharingPortal and then stream out a post request over @@ -156,9 +206,10 @@ class EnviroDIYPublisher : public dataPublisher { * @param outClient An Arduino client instance to use to print data to. * Allows the use of any type of client and multiple clients tied to a * single TinyGSM modem instance + * @param forceFlush Ask the publisher to flush buffered data immediately. * @return **int16_t** The http status code of the response. */ - int16_t publishData(Client* outClient) override; + int16_t publishData(Client* outClient, bool forceFlush = false) override; protected: /** @@ -167,9 +218,9 @@ class EnviroDIYPublisher : public dataPublisher { * * @{ */ - static const char* postEndpoint; ///< The endpoint - static const char* enviroDIYHost; ///< The host name - static const int enviroDIYPort; ///< The host port + const char* enviroDIYPath; ///< The api path + const char* enviroDIYHost; ///< The host name + int enviroDIYPort; ///< The host port static const char* tokenHeader; ///< The token header text static const char* contentLengthHeader; ///< The content length header text static const char* contentTypeHeader; ///< The content type header text @@ -185,8 +236,34 @@ class EnviroDIYPublisher : public dataPublisher { static const char* timestampTag; ///< The JSON feature timestamp tag /**@}*/ + + LogBuffer _logBuffer; ///< Internal reference to the logger buffer + + // actually transmit rather than just buffer data + /** + * @brief Transmit data from the data buffer to an external site + * + * @param outClient The client to publish the data over + * @return The HTTP response code from the publish attempt + * + * @note A 504 will be returned automatically if the server does not + * respond within 30 seconds. + */ + int16_t flushDataBuffer(Client* outClient); + + /** + * @brief The number of transmissions remaing at the single minute intervals + * + * We send every one of the first five data points at only one minute + * intervals for faster in-field validation. + */ + uint8_t _initialTransmissionsRemaining = 5; + private: - // Tokens and UUID's for EnviroDIY + /** + * @brief Internal reference to the EnviroDIY/Monitor My Watershed + * registration token. + */ const char* _registrationToken = nullptr; }; diff --git a/src/publishers/ThingSpeakPublisher.cpp b/src/publishers/ThingSpeakPublisher.cpp index f13780260..3b070088a 100644 --- a/src/publishers/ThingSpeakPublisher.cpp +++ b/src/publishers/ThingSpeakPublisher.cpp @@ -102,7 +102,7 @@ void ThingSpeakPublisher::begin(Logger& baseLogger, // This sends the data to ThingSpeak // bool ThingSpeakPublisher::mqttThingSpeak(void) -int16_t ThingSpeakPublisher::publishData(Client* outClient) { +int16_t ThingSpeakPublisher::publishData(Client* outClient, bool ) { bool retVal = false; // Make sure we don't have too many fields diff --git a/src/publishers/ThingSpeakPublisher.h b/src/publishers/ThingSpeakPublisher.h index 1a924b500..511c766fb 100644 --- a/src/publishers/ThingSpeakPublisher.h +++ b/src/publishers/ThingSpeakPublisher.h @@ -186,7 +186,7 @@ class ThingSpeakPublisher : public dataPublisher { // This sends the data to ThingSpeak // bool mqttThingSpeak(void); - int16_t publishData(Client* outClient) override; + int16_t publishData(Client* outClient, bool forceFlush = false) override; protected: /** diff --git a/src/publishers/UbidotsPublisher.cpp b/src/publishers/UbidotsPublisher.cpp index 4a111d631..33f2993f9 100644 --- a/src/publishers/UbidotsPublisher.cpp +++ b/src/publishers/UbidotsPublisher.cpp @@ -113,7 +113,7 @@ void UbidotsPublisher::begin(Logger& baseLogger, // over that connection. // The return is the http status code of the response. // int16_t EnviroDIYPublisher::postDataEnviroDIY(void) -int16_t UbidotsPublisher::publishData(Client* outClient) { +int16_t UbidotsPublisher::publishData(Client* outClient, bool) { // Create a buffer for the portions of the request and response char tempBuffer[37] = ""; uint16_t did_respond = 0; diff --git a/src/publishers/UbidotsPublisher.h b/src/publishers/UbidotsPublisher.h index c8926eb2e..61e335f05 100644 --- a/src/publishers/UbidotsPublisher.h +++ b/src/publishers/UbidotsPublisher.h @@ -164,9 +164,10 @@ class UbidotsPublisher : public dataPublisher { * @param outClient An Arduino client instance to use to print data to. * Allows the use of any type of client and multiple clients tied to a * single TinyGSM modem instance + * @param forceFlush Ask the publisher to flush buffered data immediately. * @return **int16_t** The http status code of the response. */ - int16_t publishData(Client* outClient) override; + int16_t publishData(Client* outClient, bool forceFlush) override; protected: /**