This is IoT phone demo fork with added monitoring stack to collect and visualize JMX metrics from KStream apps.
Please follow this youtube video to use it:
In the video we going to build a KStream service that count number of messages per second in input topic. This is code snippet of the service:
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Produced;
import org.json.JSONObject;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
public class MessageCountPerSecond {
public static void main(String[] args) {
try {
// Get the environment ID from the environment variables
String workspace_id = System.getenv("Quix__Workspace__Id");
// Get Kafka configuration properties
Properties props = QuixConfigBuilder.buildKafkaProperties();
props.put("application.id", workspace_id + "-" + "message-count-app");
// Set default key and value serde
props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
StreamsBuilder builder = new StreamsBuilder();
// Topic name consists of workspace ID and input topic name in QuixCloud.
KStream<String, String> messageStream = builder.stream(workspace_id + "-" + System.getenv("input"));
// Count messages per second
KTable<Windowed<String>, Long> messageCountPerSecond = messageStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(1)))
.count();
// Convert the count data to JSON format before sending it to the output topic
KStream<String, String> jsonOutputStream = messageCountPerSecond.toStream()
.map((windowedKey, count) -> {
long windowStart = windowedKey.window().start();
long windowEnd = windowedKey.window().end();
// Create a JSON object with window-start, window-end, and count
JSONObject jsonObject = new JSONObject();
jsonObject.put("window-start", windowStart);
jsonObject.put("window-end", windowEnd);
jsonObject.put("count", count);
// Convert the Windowed key to a simple string representation (optional)
String key = windowedKey.key();
// Return the new key-value pair where value is JSON string
return new org.apache.kafka.streams.KeyValue<>(key, jsonObject.toString());
});
// Output the JSON results to another topic
jsonOutputStream.to(workspace_id + "-" + System.getenv("output"), Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Gracefully shutdown on exit
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
} catch (IOException | InterruptedException e) {
e.printStackTrace();
System.exit(1);
}
}
}
This example template show how to build a Kafka Streams pipeline and includes the following deployed services:
- WebGateway - Provides a data ingestion gateway for any external data sources that can use http.
- Fleet Console - Web front end to view location and sensor data *.
- InfluxDB - Timeseries database used to hold the data collected in this pipeline *.
- Grafana - For dashboarding the data collected and generated by this demo *.
- Prometheus - To allow metrics collection.
- Telegraf - ???
- Raw to table - Converts the raw data, sent to the web gateway, into data the rest of the pipeline can handle.
- Influx-sink - Sinks data to the InfluxDB instance deployed to this demo environment.
- Crash-detection - Detects accelerometer data values that could indicate a crash or collision.
- FleetConsole Websockets - Another web gateway allowing the Fleet console to receive data from the data pipeline.
- Clone or fork this repo to your GitHub.
- Create a Quix Cloud account.
- In Quix Cloud. Create a new project.
- Under "Connect existing Git repository", select the type of Git repo you have, e.g. GitHub and click
Authorize
. This will authorize Quix Cloud to read your list of repos which is needed for the next steps. - Select the name of the repo you created by cloning this repo in the previous steps.
- Now create your first environment by clicking the blue button called
Create your first environment
. - Give it a name,
prod
, tick the "this branch is protected" check box and clickContinue
. - If you have access to a Kafka like broker you want to use you can select and configure it using the in app prompts, or if you want to you can use the managed Kafka broker by selecting "Quix managed broker".
- Once complete click "Create environment".
- Quix Cloud will now create your compute environment.
Once the environment has been created you need to synchronise it with the environment descriptor, known as a YAML file. Synchronizing is the process where Quix Cloud deploys the services described in the YAML file and prepares and opens any ports needed for data transfer.
- Click one of the blue
Sync environment
buttons. - You must now configure some required values called secrets.
It is common for projects to need some kind of secret and this one is no exception. It needs a token for accessing InfluxDB and an admin password for InfluxDB and other services. This password will be used by you to access the various services.
- Click
Add secrets
- Enter a value into the field to the right of the
ADMIN_PASSWORD
key in the "Default value" column. This is just a test system, but good password practice is still recommended. An example of a valid password is AbcdQZ!23 - Enter a value in the same way for the influx_token. This can be any value. e.g. abcdefg1234567890
- Click the X to close the Secrets panel.
- You can now click
Sync to this commit
.
Each of the services described by the YAML file will now be deployed and started.
- Go to the app store on your phone and install an app called "Sensor Logger". This is a third party app and not affiliated with Quix in any way. It is extremely useful for demos like this where you want to send complex data from a phone in near real time.
- Enable logging of all internal sensors by enabling their toggles.
- In the apps settings go into the "Data Streaming" configuration.
- In the Quix Cloud pipeline you just deployed click the blue Web Gateway service and copy the
Public URL
. You will need to get this to your phone, send yourself an email or message with this URL. Please note that you will have to add/data
to the end of the URL if it is not present. - On your phone, copy and paste the URL (ensuring it ends with
/data
) into thePush URL
field and ensure theEnable HTTP Push
toggle is switched on. - Go back to the apps home page and click "Start Recording"
Refer to the video for more details and to see it in action.
Contact us if you want an in person demo or if you get stuck.