Skip to content

Adding a new Sensor or Device

Yatharth Ranjan edited this page Feb 18, 2020 · 2 revisions

This page explains how to add an implementation for a new Sensor/Device type to the RADAR-IoT framework. Please read the README of the framework first to familiarise yourself with the concepts discussed here.


This can be divided into 3 simple steps -

  1. Define the Schema of the Sensor output
  2. Create an implementation of Sensor to retrieve the data from the sensor
  3. Create a Converter so that the data can be consumed and used by the DataConsumers

For this example, we will be creating a mock sensor so that we can test it easily on our local IDEs. The mock sensor will contain a value attribute and a time attribute. The value will be a random value to simulate a sensor and the time will the time when the value is generated.

Adding a Schema for the sensor

This is required for strong typing and data validation. The schema can be added according to the type of schema retriever that you are using. For this example, we will use the GithubAvroSchemaRetriever.

So now you just need to add the schema to a repository on Github and it will be read by the framework. We have added ours in the RADAR-Schemas repo in the sensors branch. After this, our config for the schema retriever block will look like -

      schema_retriever:
        module: 'commons.schema'
        class: 'GithubAvroSchemaRetriever'
        args:
          repo_owner: 'RADAR-base'
          repo_name: 'RADAR-Schemas'
          branch: 'sensors'
          basepath: 'commons/iot/sensor'
          extension: '.avsc'
          token: '*******'

Note: You can get a Personal Access Token from GitHub to increase the API limits and add it in the token key above.

Create an implementation of Sensor

We will now create the actual implementation that will get the data from the sensor. For this, we need to extend the Sensor abstract class and implement the get_measurement function. For our mock sensor, this will look like-

mock_sensor.py

import random
from datetime import datetime

from commons.data import Response, IoTError, ErrorCode
from sensors import Sensor

class MockSensor(Sensor):

    def __init__(self, name, topic, poll_freq_ms, flush_size, flush_after_s):
        super().__init__(name, topic, poll_freq_ms, flush_size, flush_after_s)
        self.global_counter = 0

    def get_measurement(self):
        self.global_counter += 1
        if self.global_counter % 10 == 0:
            return Response(response=None, errors=[
                IoTError('MockError', ErrorCode.STATUS_OFF, 'The MockSensor mocks an error every 10 iterations',
                      'blah->nooooo->save me->dead')])
        else:
            return Response({'time': datetime.now().timestamp(), 'value': random.random() * 1000}, errors=None)

A few things to note from the above impl -

  1. The MockSensor class must extend the Sensor class or a sub-class of Sensor class.
  2. The __init__ method (or the constructor) must call the super class' (Sensor class) constructor with all the arguments i.e super().__init__(name, topic, poll_freq_ms, flush_size, flush_after_s).
  3. We are also running a global_counter which counts the number of iterations and sends an Error response every 10th iteration.
  4. The get_measurement function returns a Response object from the commons.data module. It can contain a dictionary for actual sensor response and a list of Error objects.
  5. There is a custom error class IoTError created specifically for any errors occurred during the normal functioning of the framework.

After the implementation, you can now add the sensor to the config and start collecting data -

 -  name: "mock_sensor"
    # Name of your python module which contains the sensor class
    module: "test.mock"
    # Name of the class of the sensor in the module
    class: "MockSensor"
    # topic/channel to publish the data to in pub/sub paradigm
    publishing_topic: "data-stream/sensors/mock"
    # polling frequency in milliseconds 
    poll_frequency_ms: 1000 
    # Flush size for flushing the records
    flush_size: 10
    # Flush after [value] seconds if the flush size is not reached
    flush_after_s: 1000

Note: The name of the sensor must match the name of the schema file you created in step 1.

Start Redis and check the data

The pre-requisite for this step is having Docker and docker-compose installed on your system. For installing, please take a look at the official docs for docker and docker-compose.

The next intermediate step is to check if we are in fact able to send data to the pub/sub system. For this, first, start redis using docker by running the following in the project's root directory -

docker-compose -f docker/redis.yml up -d

After this, run the application using

python3 main.py

Note: Please make sure you have Python 3.7+ installed on your system or virtual env.

Now, you can see the data in Redis by running -

docker exec -it docker_redis_1 redis-cli subscribe data-stream/sensors/mock

You should see the data being published to the mock channel. This will be every 10 messages or 1000 seconds as we specified in the config of the sensor. Now we need to consume this data and do something with it. So we will add a converter for reading the data in the data consumer and uploading it to influx-DB and displaying it on Grafana dashboards.

Consuming the data

This part will be written in kotlin since our data-consumer part is based on kotlin. But you are free to use any language supported that has a Redis client(available for most programming languages). This will also enable you to use any existing libraries for your platform in other languages.

We will write a converter which will convert the data received from the channel or topic in Redis and use it to convert the data to an influx DB Point object.

For this cd into the data consumer directory (data/kotlin) and open the project in your favourite Java/Kotlin IDE (IntelliJ IDEA is the standard one). Add a new file in the package data/kotlin/data-uploader/src/main/kotlin/org/radarbase/iot/converter/influxdb named MockSensorInfluxDbConverter.kt and add the following lines -

MockSensorInfluxDbConverter.kt

package org.radarbase.iot.converter.influxdb

import com.fasterxml.jackson.core.type.TypeReference
import org.influxdb.dto.Point
import org.radarbase.iot.commons.util.Parser
import org.radarbase.iot.converter.influxdb.InfluxDbConverter
import org.radarbase.iot.converter.messageparser.JsonMessageParser
import org.radarbase.iot.sensor.MockSensor
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit

class MockSensorInfluxDbConverter(
    private val measurementName: String = "mockSensor",
    private val parser: Parser<String,
            List<MockSensor>> = JsonMessageParser(typeReference)
) : InfluxDbConverter {
    override fun convert(messages: List<String>): List<Point> {
        return messages.map { message ->
            parser.parse(message).map {
                Point.measurement(measurementName)
                    .time(it.getTime().toLong(), TimeUnit.SECONDS)
                    .addField("value", it.getValue())
                    .tag(InfluxDbConverter.genericKeyMap)
                    .build()
            }
        }.flatten()
    }

    companion object {

        private val logger =
            LoggerFactory.getLogger(MockSensorInfluxDbConverter::class.java)

        private val typeReference = object : TypeReference<List<MockSensor>>() {}
    }
}

Note: The MockSensor class is an Avro class which is generated using the schema specified in step one. This can be done using the Avro tools as mentioned in this tutorial and then you need to place it in the classpath. If using the RADAR-Schemas repo this can be easily generated using the build command in java-sdk path. It will then be generated as radar-schemas-commons library which can be directly included in build.gradle dependencies.

In the above implementation, we are reading a serialised String (which contains a list of sensor Responses) and deserializing the JSON and then creating influxDb Point objects. We are also adding the keys (projectId, subjectId and sourceId) as tags so we can differentiate between data from different users/deployments.

Start Influx DB and Grafana

You need to start influxdb and grafana so you can view the data. For this, you can run them on you local computer for testing. Run the following -

docker run -d -p 8086:8086 \
      -v influxdb:/var/lib/influxdb \
      influxdb

and

docker run -d -p 3000:3000 grafana/grafana

Now we will configure the data-uploader module to consume the data from the mock sensor. Add the following to the file data/kotlin/data-uploader/src/main/resources/radar_iot_config.yaml -

radarConfig:
  projectId: "radar"
  userId: "sub-1"
  sourceId: "03d28e5c-e005-46d4-a9b3-279c27fbbc83"

# If a converter is not specified for a particular consumer for a sensor,
# then the data from the sensor will not be forwarded to that consumer for processing
sensorConfigs:
  - sensorName: "mock"
    inputTopic: "data-stream/sensors/mock"
    outputTopic: "mock"
    converterClasses:
      - consumerName: "influx_db"
        converterClass: "org.radarbase.iot.converter.influxdb.MockSensorInfluxDbConverter"

dataConsumerConfigs:
  - consumerClass: "org.radarbase.iot.consumer.InfluxDbDataConsumer"
    maxCacheSize: "1000"
    uploadIntervalSeconds: "10"
    consumerName: "influx_db"

influxDbConfig:
  url: "http://localhost:8086"
  username: "root"
  password: "root"
  dbName: "radarIot"
  retentionPolicyName: "radarIotRetentionPolicy"
  # Should be at least 1h
  retentionPolicyDuration: "1h"
  retentionPolicyReplicationFactor: 1

Now, run the application using Gradle -

./gradlew run

This will enable sending data to Influx Db. Now we will set up Grafana to view this data.

Set Up Grafana

Please set up influx DB in Grafana using the tutorial here. Please note to use the same values for configuring the database as mentioned in the config file above under the influxDbConfig tag.

The measurement name in our case will be mockSensor as mentioned in the MockSensorInfluxDbConverter constructor.

You should be able to see the data in a nice graph. Note: For production deployments, it is recommended to deploy influxDB and Grafana on a VM possibly behind a reverse proxy and use proper authentication instead of provided defaults.

Additional Converters and Consumers

You can add a converter for each type of data consumer that you want. For example, If also using the RADAR-base platform as the backend (which is based on Apache kafka and Confluent Rest Proxy, So should also work with Kafka directly), You can create an Avro Converter for the data, and specify RestProxyDataConsumer as the consumer.

So the converter will look something like this -

data/kotlin/data-uploader/src/main/kotlin/org/radarbase/iot/converter/avro/MockSensorAvroConverter.kt

package org.radarbase.iot.converter.avro.grovepi

import com.fasterxml.jackson.core.type.TypeReference
import org.radarbase.data.AvroRecordData
import org.radarbase.data.RecordData
import org.radarbase.iot.commons.util.Parser
import org.radarbase.iot.converter.avro.AvroConverter
import org.radarbase.iot.converter.messageparser.JsonMessageParser
import org.radarbase.iot.sensor.MockSensor
import org.radarbase.topic.AvroTopic
import org.radarcns.kafka.ObservationKey
import org.slf4j.LoggerFactory

class MockSensorAvroConverter(
    private val topicName: String = "radar_iot_mock_sensor",
    private val messageParser: Parser<String, List<MockSensor>> =
        JsonMessageParser(typeReference)
) :
    AvroConverter<ObservationKey, GrovePiAirQualitySensor> {
    override fun getAvroTopic(): AvroTopic<ObservationKey, MockSensor> =
        AvroTopic(
            topicName, ObservationKey.getClassSchema(), MockSensor.getClassSchema(),
            ObservationKey::class.java, MockSensor::class.java
        )

    override fun convert(messages: List<String>):
            RecordData<ObservationKey, MockSensor> {

        val values: List<MockSensor> = messages.map {
            logger.debug("Parsing message: $it")
            messageParser.parse(it)
        }.flatten()

        logger.debug("Avro Values: ${values.map { it.toString() }}")

        return AvroRecordData<ObservationKey, MockSensor>(
            getAvroTopic(),
            AvroConverter.genericObservationKey,
            values
        )
    }

    companion object {
        private val logger = LoggerFactory.getLogger(MockSensorAvroConverter::class.java)

        private val typeReference = object : TypeReference<List<MockSensor>>() {}

    }
}

Then add it to the configuration file. So the final config file data/kotlin/data-uploader/src/main/resources/radar_iot_config.yaml will look like -

# make sure these values are exactly as in Management portal for using authorisation
radarConfig:
  projectId: "radar"
  userId: "sub-1"
  sourceId: "03d28e5c-e005-46d4-a9b3-279c27fbbc83"
  schemaRegistryUrl: "http://localhost:8084/"
  kafkaUrl: "http://localhost:8090/radar-gateway/"
  # The following values are required for authentication with the RADAR-base platform. Remove these if using plain Rest Proxy.
  baseUrl: "http://localhost:8081"
  oAuthClientId: "radar_iot"
  oAuthClientSecret: "secret"
  metaToken: "dlsLwIw0E1cP"

dataConsumerConfigs:
  - consumerClass: "org.radarbase.iot.consumer.RestProxyDataConsumer"
    maxCacheSize: "1000"
    uploadIntervalSeconds: "10"
    consumerName: "rest_proxy"
  - consumerClass: "org.radarbase.iot.consumer.InfluxDbDataConsumer"
    maxCacheSize: "1000"
    uploadIntervalSeconds: "10"
    consumerName: "influx_db"

influxDbConfig:
  url: "http://10.200.107.10:8086"
  username: "root"
  password: "root"
  dbName: "radarIot"
  retentionPolicyName: "radarIotRetentionPolicy"
  # Should be at least 1h
  retentionPolicyDuration: "1h"
  retentionPolicyReplicationFactor: 1

# If a converter is not specified for a particular consumer for a sensor,
# then the data from the sensor will not be forwarded to that consumer for processing
sensorConfigs:
  - sensorName: "mock"
    inputTopic: "data-stream/sensors/mock"
    outputTopic: "mock"
    converterClasses:
      - consumerName: "rest_proxy"
        converterClass: "org.radarbase.iot.converter.avro.MockSensorAvroConverter"
      - consumerName: "influx_db"
        converterClass: "org.radarbase.iot.converter.influxdb.coralenviro.CoralEnviroHumidityInfluxDbConverter"

Note that the above example was just an illustration of the process required for adding a new sensor. The actual steps will vary from implementation to implementation and you may need to adapt some parts of this tutorial. If you have any trouble, please feel free to open an issue.