diff --git a/build.gradle b/build.gradle
index fadb6124..9a939d42 100644
--- a/build.gradle
+++ b/build.gradle
@@ -17,7 +17,7 @@ plugins {
//---------------------------------------------------------------------------//
group = 'org.radarcns'
-version = '0.1.1'
+version = '0.1.2'
ext.description = 'Kafka backend for processing device data.'
mainClassName = 'org.radarcns.RadarBackend'
@@ -41,7 +41,7 @@ ext.findbugVersion = '3.0.1'
ext.commonsCliVersion = '1.2'
ext.mockitoVersion = '2.2.29'
ext.radarCommonsVersion = '0.6.3'
-ext.radarSchemasVersion = '0.2.2'
+ext.radarSchemasVersion = '0.2.3'
ext.subethamailVersion = '3.1.7'
ext.jsoupVersion = '1.10.2'
ext.slf4jVersion = '1.7.25'
diff --git a/src/integrationTest/java/org/radarcns/integration/PhoneStreamTest.java b/src/integrationTest/java/org/radarcns/integration/PhoneStreamTest.java
index aa3d8ab5..4233a97f 100644
--- a/src/integrationTest/java/org/radarcns/integration/PhoneStreamTest.java
+++ b/src/integrationTest/java/org/radarcns/integration/PhoneStreamTest.java
@@ -114,7 +114,7 @@ public void tearDown() throws IOException, InterruptedException {
backend.shutdown();
}
- @Test(timeout = 600_000L)
+ @Test(timeout = 3000_000L)
public void testDirect() throws Exception {
ConfigRadar config = propHandler.getRadarProperties();
diff --git a/src/main/java/org/radarcns/aggregator/PhoneUsageAggregator.java b/src/main/java/org/radarcns/aggregator/PhoneUsageAggregator.java
deleted file mode 100644
index fe885eee..00000000
--- a/src/main/java/org/radarcns/aggregator/PhoneUsageAggregator.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.radarcns.aggregator;
-
-import org.apache.avro.specific.SpecificData;
-
-@SuppressWarnings("all")
-/** Aggregate time and opening events for an app */
-@org.apache.avro.specific.AvroGenerated
-public class PhoneUsageAggregator extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- private static final long serialVersionUID = 9003700960201664796L;
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"PhoneUsageAggregator\",\"namespace\":\"org.radarcns.aggregator\",\"doc\":\"Aggregate time and opening events for an app\",\"fields\":[{\"name\":\"packageName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"package name of the app in use\"},{\"name\":\"durationInForeground\",\"type\":\"double\",\"doc\":\"Total time in milliseconds the app was in the foreground\"},{\"name\":\"timesOpen\",\"type\":\"int\",\"doc\":\"Total amount of times given app was opened in given time-frame\"},{\"name\":\"categoryName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"app category as given by the play store, null if a category is not listed or unable to be fetched\",\"default\":null},{\"name\":\"categoryNameFetchTime\",\"type\":[\"null\",\"double\"],\"doc\":\"timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched\",\"default\":null}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- /** package name of the app in use */
- @Deprecated public java.lang.String packageName;
- /** Total time in milliseconds the app was in the foreground */
- @Deprecated public double durationInForeground;
- /** Total amount of times given app was opened in given time-frame */
- @Deprecated public int timesOpen;
- /** app category as given by the play store, null if a category is not listed or unable to be fetched */
- @Deprecated public java.lang.String categoryName;
- /** timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched */
- @Deprecated public java.lang.Double categoryNameFetchTime;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use newBuilder()
.
- */
- public PhoneUsageAggregator() {}
-
- /**
- * All-args constructor.
- * @param packageName package name of the app in use
- * @param durationInForeground Total time in milliseconds the app was in the foreground
- * @param timesOpen Total amount of times given app was opened in given time-frame
- * @param categoryName app category as given by the play store, null if a category is not listed or unable to be fetched
- * @param categoryNameFetchTime timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- */
- public PhoneUsageAggregator(java.lang.String packageName, java.lang.Double durationInForeground, java.lang.Integer timesOpen, java.lang.String categoryName, java.lang.Double categoryNameFetchTime) {
- this.packageName = packageName;
- this.durationInForeground = durationInForeground;
- this.timesOpen = timesOpen;
- this.categoryName = categoryName;
- this.categoryNameFetchTime = categoryNameFetchTime;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return packageName;
- case 1: return durationInForeground;
- case 2: return timesOpen;
- case 3: return categoryName;
- case 4: return categoryNameFetchTime;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: packageName = (java.lang.String)value$; break;
- case 1: durationInForeground = (java.lang.Double)value$; break;
- case 2: timesOpen = (java.lang.Integer)value$; break;
- case 3: categoryName = (java.lang.String)value$; break;
- case 4: categoryNameFetchTime = (java.lang.Double)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'packageName' field.
- * @return package name of the app in use
- */
- public java.lang.String getPackageName() {
- return packageName;
- }
-
- /**
- * Sets the value of the 'packageName' field.
- * package name of the app in use
- * @param value the value to set.
- */
- public void setPackageName(java.lang.String value) {
- this.packageName = value;
- }
-
- /**
- * Gets the value of the 'durationInForeground' field.
- * @return Total time in milliseconds the app was in the foreground
- */
- public java.lang.Double getDurationInForeground() {
- return durationInForeground;
- }
-
- /**
- * Sets the value of the 'durationInForeground' field.
- * Total time in milliseconds the app was in the foreground
- * @param value the value to set.
- */
- public void setDurationInForeground(java.lang.Double value) {
- this.durationInForeground = value;
- }
-
- /**
- * Gets the value of the 'timesOpen' field.
- * @return Total amount of times given app was opened in given time-frame
- */
- public java.lang.Integer getTimesOpen() {
- return timesOpen;
- }
-
- /**
- * Sets the value of the 'timesOpen' field.
- * Total amount of times given app was opened in given time-frame
- * @param value the value to set.
- */
- public void setTimesOpen(java.lang.Integer value) {
- this.timesOpen = value;
- }
-
- /**
- * Gets the value of the 'categoryName' field.
- * @return app category as given by the play store, null if a category is not listed or unable to be fetched
- */
- public java.lang.String getCategoryName() {
- return categoryName;
- }
-
- /**
- * Sets the value of the 'categoryName' field.
- * app category as given by the play store, null if a category is not listed or unable to be fetched
- * @param value the value to set.
- */
- public void setCategoryName(java.lang.String value) {
- this.categoryName = value;
- }
-
- /**
- * Gets the value of the 'categoryNameFetchTime' field.
- * @return timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- */
- public java.lang.Double getCategoryNameFetchTime() {
- return categoryNameFetchTime;
- }
-
- /**
- * Sets the value of the 'categoryNameFetchTime' field.
- * timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- * @param value the value to set.
- */
- public void setCategoryNameFetchTime(java.lang.Double value) {
- this.categoryNameFetchTime = value;
- }
-
- /**
- * Creates a new PhoneUsageAggregator RecordBuilder.
- * @return A new PhoneUsageAggregator RecordBuilder
- */
- public static org.radarcns.aggregator.PhoneUsageAggregator.Builder newBuilder() {
- return new org.radarcns.aggregator.PhoneUsageAggregator.Builder();
- }
-
- /**
- * Creates a new PhoneUsageAggregator RecordBuilder by copying an existing Builder.
- * @param other The existing builder to copy.
- * @return A new PhoneUsageAggregator RecordBuilder
- */
- public static org.radarcns.aggregator.PhoneUsageAggregator.Builder newBuilder(org.radarcns.aggregator.PhoneUsageAggregator.Builder other) {
- return new org.radarcns.aggregator.PhoneUsageAggregator.Builder(other);
- }
-
- /**
- * Creates a new PhoneUsageAggregator RecordBuilder by copying an existing PhoneUsageAggregator instance.
- * @param other The existing instance to copy.
- * @return A new PhoneUsageAggregator RecordBuilder
- */
- public static org.radarcns.aggregator.PhoneUsageAggregator.Builder newBuilder(org.radarcns.aggregator.PhoneUsageAggregator other) {
- return new org.radarcns.aggregator.PhoneUsageAggregator.Builder(other);
- }
-
- /**
- * RecordBuilder for PhoneUsageAggregator instances.
- */
- public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
- implements org.apache.avro.data.RecordBuilder {
-
- /** package name of the app in use */
- private java.lang.String packageName;
- /** Total time in milliseconds the app was in the foreground */
- private double durationInForeground;
- /** Total amount of times given app was opened in given time-frame */
- private int timesOpen;
- /** app category as given by the play store, null if a category is not listed or unable to be fetched */
- private java.lang.String categoryName;
- /** timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched */
- private java.lang.Double categoryNameFetchTime;
-
- /** Creates a new Builder */
- private Builder() {
- super(SCHEMA$);
- }
-
- /**
- * Creates a Builder by copying an existing Builder.
- * @param other The existing Builder to copy.
- */
- private Builder(org.radarcns.aggregator.PhoneUsageAggregator.Builder other) {
- super(other);
- if (isValidValue(fields()[0], other.packageName)) {
- this.packageName = data().deepCopy(fields()[0].schema(), other.packageName);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.durationInForeground)) {
- this.durationInForeground = data().deepCopy(fields()[1].schema(), other.durationInForeground);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.timesOpen)) {
- this.timesOpen = data().deepCopy(fields()[2].schema(), other.timesOpen);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.categoryName)) {
- this.categoryName = data().deepCopy(fields()[3].schema(), other.categoryName);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.categoryNameFetchTime)) {
- this.categoryNameFetchTime = data().deepCopy(fields()[4].schema(), other.categoryNameFetchTime);
- fieldSetFlags()[4] = true;
- }
- }
-
- /**
- * Creates a Builder by copying an existing PhoneUsageAggregator instance
- * @param other The existing instance to copy.
- */
- private Builder(org.radarcns.aggregator.PhoneUsageAggregator other) {
- super(SCHEMA$);
- if (isValidValue(fields()[0], other.packageName)) {
- this.packageName = data().deepCopy(fields()[0].schema(), other.packageName);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.durationInForeground)) {
- this.durationInForeground = data().deepCopy(fields()[1].schema(), other.durationInForeground);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.timesOpen)) {
- this.timesOpen = data().deepCopy(fields()[2].schema(), other.timesOpen);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.categoryName)) {
- this.categoryName = data().deepCopy(fields()[3].schema(), other.categoryName);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.categoryNameFetchTime)) {
- this.categoryNameFetchTime = data().deepCopy(fields()[4].schema(), other.categoryNameFetchTime);
- fieldSetFlags()[4] = true;
- }
- }
-
- /**
- * Gets the value of the 'packageName' field.
- * package name of the app in use
- * @return The value.
- */
- public java.lang.String getPackageName() {
- return packageName;
- }
-
- /**
- * Sets the value of the 'packageName' field.
- * package name of the app in use
- * @param value The value of 'packageName'.
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder setPackageName(java.lang.String value) {
- validate(fields()[0], value);
- this.packageName = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /**
- * Checks whether the 'packageName' field has been set.
- * package name of the app in use
- * @return True if the 'packageName' field has been set, false otherwise.
- */
- public boolean hasPackageName() {
- return fieldSetFlags()[0];
- }
-
-
- /**
- * Clears the value of the 'packageName' field.
- * package name of the app in use
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder clearPackageName() {
- packageName = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /**
- * Gets the value of the 'durationInForeground' field.
- * Total time in milliseconds the app was in the foreground
- * @return The value.
- */
- public java.lang.Double getDurationInForeground() {
- return durationInForeground;
- }
-
- /**
- * Sets the value of the 'durationInForeground' field.
- * Total time in milliseconds the app was in the foreground
- * @param value The value of 'durationInForeground'.
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder setDurationInForeground(double value) {
- validate(fields()[1], value);
- this.durationInForeground = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /**
- * Checks whether the 'durationInForeground' field has been set.
- * Total time in milliseconds the app was in the foreground
- * @return True if the 'durationInForeground' field has been set, false otherwise.
- */
- public boolean hasDurationInForeground() {
- return fieldSetFlags()[1];
- }
-
-
- /**
- * Clears the value of the 'durationInForeground' field.
- * Total time in milliseconds the app was in the foreground
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder clearDurationInForeground() {
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /**
- * Gets the value of the 'timesOpen' field.
- * Total amount of times given app was opened in given time-frame
- * @return The value.
- */
- public java.lang.Integer getTimesOpen() {
- return timesOpen;
- }
-
- /**
- * Sets the value of the 'timesOpen' field.
- * Total amount of times given app was opened in given time-frame
- * @param value The value of 'timesOpen'.
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder setTimesOpen(int value) {
- validate(fields()[2], value);
- this.timesOpen = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /**
- * Checks whether the 'timesOpen' field has been set.
- * Total amount of times given app was opened in given time-frame
- * @return True if the 'timesOpen' field has been set, false otherwise.
- */
- public boolean hasTimesOpen() {
- return fieldSetFlags()[2];
- }
-
-
- /**
- * Clears the value of the 'timesOpen' field.
- * Total amount of times given app was opened in given time-frame
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder clearTimesOpen() {
- fieldSetFlags()[2] = false;
- return this;
- }
-
- /**
- * Gets the value of the 'categoryName' field.
- * app category as given by the play store, null if a category is not listed or unable to be fetched
- * @return The value.
- */
- public java.lang.String getCategoryName() {
- return categoryName;
- }
-
- /**
- * Sets the value of the 'categoryName' field.
- * app category as given by the play store, null if a category is not listed or unable to be fetched
- * @param value The value of 'categoryName'.
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder setCategoryName(java.lang.String value) {
- validate(fields()[3], value);
- this.categoryName = value;
- fieldSetFlags()[3] = true;
- return this;
- }
-
- /**
- * Checks whether the 'categoryName' field has been set.
- * app category as given by the play store, null if a category is not listed or unable to be fetched
- * @return True if the 'categoryName' field has been set, false otherwise.
- */
- public boolean hasCategoryName() {
- return fieldSetFlags()[3];
- }
-
-
- /**
- * Clears the value of the 'categoryName' field.
- * app category as given by the play store, null if a category is not listed or unable to be fetched
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder clearCategoryName() {
- categoryName = null;
- fieldSetFlags()[3] = false;
- return this;
- }
-
- /**
- * Gets the value of the 'categoryNameFetchTime' field.
- * timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- * @return The value.
- */
- public java.lang.Double getCategoryNameFetchTime() {
- return categoryNameFetchTime;
- }
-
- /**
- * Sets the value of the 'categoryNameFetchTime' field.
- * timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- * @param value The value of 'categoryNameFetchTime'.
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder setCategoryNameFetchTime(java.lang.Double value) {
- validate(fields()[4], value);
- this.categoryNameFetchTime = value;
- fieldSetFlags()[4] = true;
- return this;
- }
-
- /**
- * Checks whether the 'categoryNameFetchTime' field has been set.
- * timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- * @return True if the 'categoryNameFetchTime' field has been set, false otherwise.
- */
- public boolean hasCategoryNameFetchTime() {
- return fieldSetFlags()[4];
- }
-
-
- /**
- * Clears the value of the 'categoryNameFetchTime' field.
- * timestamp in UTC when the category was attempted to fetch from the play store (s), null if not fetched
- * @return This builder.
- */
- public org.radarcns.aggregator.PhoneUsageAggregator.Builder clearCategoryNameFetchTime() {
- categoryNameFetchTime = null;
- fieldSetFlags()[4] = false;
- return this;
- }
-
- @Override
- public PhoneUsageAggregator build() {
- try {
- PhoneUsageAggregator record = new PhoneUsageAggregator();
- record.packageName = fieldSetFlags()[0] ? this.packageName : (java.lang.String) defaultValue(fields()[0]);
- record.durationInForeground = fieldSetFlags()[1] ? this.durationInForeground : (java.lang.Double) defaultValue(fields()[1]);
- record.timesOpen = fieldSetFlags()[2] ? this.timesOpen : (java.lang.Integer) defaultValue(fields()[2]);
- record.categoryName = fieldSetFlags()[3] ? this.categoryName : (java.lang.String) defaultValue(fields()[3]);
- record.categoryNameFetchTime = fieldSetFlags()[4] ? this.categoryNameFetchTime : (java.lang.Double) defaultValue(fields()[4]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-
- private static final org.apache.avro.io.DatumWriter
- WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
-
- @Override public void writeExternal(java.io.ObjectOutput out)
- throws java.io.IOException {
- WRITER$.write(this, SpecificData.getEncoder(out));
- }
-
- private static final org.apache.avro.io.DatumReader
- READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
-
- @Override public void readExternal(java.io.ObjectInput in)
- throws java.io.IOException {
- READER$.read(this, SpecificData.getDecoder(in));
- }
-
-}
diff --git a/src/main/java/org/radarcns/stream/phone/PhoneUsageAggregationStream.java b/src/main/java/org/radarcns/stream/phone/PhoneUsageAggregationStream.java
index c12d7acd..576b2ba5 100644
--- a/src/main/java/org/radarcns/stream/phone/PhoneUsageAggregationStream.java
+++ b/src/main/java/org/radarcns/stream/phone/PhoneUsageAggregationStream.java
@@ -1,7 +1,6 @@
package org.radarcns.stream.phone;
import org.apache.kafka.streams.kstream.KStream;
-import org.radarcns.aggregator.PhoneUsageAggregator;
import org.radarcns.config.RadarPropertyHandler;
import org.radarcns.kafka.AggregateKey;
import org.radarcns.kafka.ObservationKey;
@@ -9,6 +8,7 @@
import org.radarcns.stream.StreamDefinition;
import org.radarcns.stream.StreamMaster;
import org.radarcns.stream.StreamWorker;
+import org.radarcns.stream.aggregator.PhoneUsageAggregation;
import org.radarcns.util.serde.RadarSerdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +28,7 @@ public PhoneUsageAggregationStream(Collection definitions, int
}
@Override
- protected KStream implementStream(
+ protected KStream implementStream(
StreamDefinition definition,
@Nonnull KStream kstream) {
return kstream.groupBy(PhoneUsageAggregationStream::temporaryKey)
diff --git a/src/main/java/org/radarcns/util/RadarUtilities.java b/src/main/java/org/radarcns/util/RadarUtilities.java
index 884aa5af..3e814ed0 100644
--- a/src/main/java/org/radarcns/util/RadarUtilities.java
+++ b/src/main/java/org/radarcns/util/RadarUtilities.java
@@ -18,12 +18,12 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
-import org.radarcns.aggregator.PhoneUsageAggregator;
import org.radarcns.kafka.AggregateKey;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.passive.empatica.EmpaticaE4Acceleration;
import org.radarcns.stream.aggregator.DoubleAggregation;
import org.radarcns.stream.aggregator.DoubleArrayAggregation;
+import org.radarcns.stream.aggregator.PhoneUsageAggregation;
import org.radarcns.stream.collector.DoubleArrayCollector;
import org.radarcns.stream.collector.DoubleValueCollector;
import org.radarcns.stream.phone.PhoneUsageCollector;
@@ -49,7 +49,7 @@ KeyValue collectorToAvro(
KeyValue collectorToAvro(
Windowed window, DoubleValueCollector collector);
- KeyValue collectorToAvro(
+ KeyValue collectorToAvro(
Windowed window, PhoneUsageCollector collector);
double floatToDouble(float input);
diff --git a/src/main/java/org/radarcns/util/RadarUtilitiesImpl.java b/src/main/java/org/radarcns/util/RadarUtilitiesImpl.java
index b5fa11c4..4f8e5fe9 100644
--- a/src/main/java/org/radarcns/util/RadarUtilitiesImpl.java
+++ b/src/main/java/org/radarcns/util/RadarUtilitiesImpl.java
@@ -18,12 +18,12 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
-import org.radarcns.aggregator.PhoneUsageAggregator;
-import org.radarcns.passive.empatica.EmpaticaE4Acceleration;
-import org.radarcns.kafka.ObservationKey;
import org.radarcns.kafka.AggregateKey;
+import org.radarcns.kafka.ObservationKey;
+import org.radarcns.passive.empatica.EmpaticaE4Acceleration;
import org.radarcns.stream.aggregator.DoubleAggregation;
import org.radarcns.stream.aggregator.DoubleArrayAggregation;
+import org.radarcns.stream.aggregator.PhoneUsageAggregation;
import org.radarcns.stream.collector.DoubleArrayCollector;
import org.radarcns.stream.collector.DoubleValueCollector;
import org.radarcns.stream.phone.PhoneUsageCollector;
@@ -32,6 +32,8 @@
import java.util.ArrayList;
import java.util.List;
+import static org.apache.kafka.streams.KeyValue.pair;
+
/**
* Implements {@link RadarUtilities}.
*/
@@ -62,10 +64,10 @@ public double floatToDouble(float input) {
@Override
- public KeyValue collectorToAvro(
+ public KeyValue collectorToAvro(
Windowed window, PhoneUsageCollector collector
) {
- return new KeyValue<>(getWindowedTuple(window) , new PhoneUsageAggregator(
+ return pair(getWindowedTuple(window) , new PhoneUsageAggregation(
window.key().getPackageName(),
collector.getTotalForegroundTime(),
collector.getTimesTurnedOn(),
@@ -97,14 +99,14 @@ public KeyValue collectorToAvro(
quartile.add(subcollector.getQuartile());
}
- return new KeyValue<>(getWindowed(window),
+ return pair(getWindowed(window),
new DoubleArrayAggregation(min, max, sum, count, avg, quartile, iqr));
}
@Override
public KeyValue collectorToAvro(
Windowed window, DoubleValueCollector collector) {
- return new KeyValue<>(getWindowed(window),
+ return pair(getWindowed(window),
new DoubleAggregation(collector.getMin(), collector.getMax(), collector.getSum(),
collector.getCount(), collector.getAvg(), collector.getQuartile(),
collector.getIqr()));