Skip to content

quixio/iot-phone-demo-kstream

 
 

Repository files navigation

KStream pipeline template

This is IoT phone demo fork with added monitoring stack to collect and visualize JMX metrics from KStream apps.

Please follow this youtube video to use it:

Watch the video

Example KStream app

In the video we going to build a KStream service that count number of messages per second in input topic. This is code snippet of the service:

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Produced;
import org.json.JSONObject;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;

public class MessageCountPerSecond {

    public static void main(String[] args) {
        try {
            
            // Get the environment ID from the environment variables
            String workspace_id = System.getenv("Quix__Workspace__Id");

            // Get Kafka configuration properties
            Properties props = QuixConfigBuilder.buildKafkaProperties();

            props.put("application.id", workspace_id + "-" + "message-count-app");

            // Set default key and value serde
            props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
            props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");

            StreamsBuilder builder = new StreamsBuilder();

            // Topic name consists of workspace ID and input topic name in QuixCloud.
            KStream<String, String> messageStream = builder.stream(workspace_id + "-" + System.getenv("input"));

            // Count messages per second
            KTable<Windowed<String>, Long> messageCountPerSecond = messageStream
                    .groupByKey()
                    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(1)))
                    .count();

            // Convert the count data to JSON format before sending it to the output topic
            KStream<String, String> jsonOutputStream = messageCountPerSecond.toStream()
                    .map((windowedKey, count) -> {
                        long windowStart = windowedKey.window().start();
                        long windowEnd = windowedKey.window().end();

                        // Create a JSON object with window-start, window-end, and count
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("window-start", windowStart);
                        jsonObject.put("window-end", windowEnd);
                        jsonObject.put("count", count);

                        // Convert the Windowed key to a simple string representation (optional)
                        String key = windowedKey.key();

                        // Return the new key-value pair where value is JSON string
                        return new org.apache.kafka.streams.KeyValue<>(key, jsonObject.toString());
                    });

            // Output the JSON results to another topic
            jsonOutputStream.to(workspace_id + "-" + System.getenv("output"), Produced.with(Serdes.String(), Serdes.String()));

            KafkaStreams streams = new KafkaStreams(builder.build(), props);
            streams.start();

            // Gracefully shutdown on exit
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

Prometheus monitoring for KStream apps in Quix Cloud

This example template show how to build a Kafka Streams pipeline and includes the following deployed services:

  • WebGateway - Provides a data ingestion gateway for any external data sources that can use http.
  • Fleet Console - Web front end to view location and sensor data *.
  • InfluxDB - Timeseries database used to hold the data collected in this pipeline *.
  • Grafana - For dashboarding the data collected and generated by this demo *.
  • Prometheus - To allow metrics collection.
  • Telegraf - ???
  • Raw to table - Converts the raw data, sent to the web gateway, into data the rest of the pipeline can handle.
  • Influx-sink - Sinks data to the InfluxDB instance deployed to this demo environment.
  • Crash-detection - Detects accelerometer data values that could indicate a crash or collision.
  • FleetConsole Websockets - Another web gateway allowing the Fleet console to receive data from the data pipeline.

How to use it

Clone and setup

  1. Clone or fork this repo to your GitHub.
  2. Create a Quix Cloud account.
  3. In Quix Cloud. Create a new project.
  4. Under "Connect existing Git repository", select the type of Git repo you have, e.g. GitHub and click Authorize. This will authorize Quix Cloud to read your list of repos which is needed for the next steps.
  5. Select the name of the repo you created by cloning this repo in the previous steps.
  6. Now create your first environment by clicking the blue button called Create your first environment.
  7. Give it a name, prod, tick the "this branch is protected" check box and click Continue.
  8. If you have access to a Kafka like broker you want to use you can select and configure it using the in app prompts, or if you want to you can use the managed Kafka broker by selecting "Quix managed broker".
  9. Once complete click "Create environment".
  10. Quix Cloud will now create your compute environment.

Synchronization

Once the environment has been created you need to synchronise it with the environment descriptor, known as a YAML file. Synchronizing is the process where Quix Cloud deploys the services described in the YAML file and prepares and opens any ports needed for data transfer.

  1. Click one of the blue Sync environment buttons.
  2. You must now configure some required values called secrets.

Secrets

It is common for projects to need some kind of secret and this one is no exception. It needs a token for accessing InfluxDB and an admin password for InfluxDB and other services. This password will be used by you to access the various services.

  1. Click Add secrets
  2. Enter a value into the field to the right of the ADMIN_PASSWORD key in the "Default value" column. This is just a test system, but good password practice is still recommended. An example of a valid password is AbcdQZ!23
  3. Enter a value in the same way for the influx_token. This can be any value. e.g. abcdefg1234567890
  4. Click the X to close the Secrets panel.

Final Sync

  1. You can now click Sync to this commit.

Each of the services described by the YAML file will now be deployed and started.

Sensor logger

  1. Go to the app store on your phone and install an app called "Sensor Logger". This is a third party app and not affiliated with Quix in any way. It is extremely useful for demos like this where you want to send complex data from a phone in near real time.
  2. Enable logging of all internal sensors by enabling their toggles.
  3. In the apps settings go into the "Data Streaming" configuration.
  4. In the Quix Cloud pipeline you just deployed click the blue Web Gateway service and copy the Public URL. You will need to get this to your phone, send yourself an email or message with this URL. Please note that you will have to add /data to the end of the URL if it is not present.
  5. On your phone, copy and paste the URL (ensuring it ends with /data) into the Push URL field and ensure the Enable HTTP Push toggle is switched on.
  6. Go back to the apps home page and click "Start Recording"

Refer to the video for more details and to see it in action.

Contact us if you want an in person demo or if you get stuck.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • TypeScript 40.9%
  • Python 27.2%
  • HTML 20.6%
  • SCSS 9.0%
  • JavaScript 1.9%
  • Shell 0.4%