diff --git a/.editorconfig b/.editorconfig index ce7dcfe8..d340b76d 100644 --- a/.editorconfig +++ b/.editorconfig @@ -15,12 +15,8 @@ charset = utf-8 trim_trailing_whitespace = true insert_final_newline = true -[*.{kt,kts}] -ktlint_standard_no-wildcard-imports = disabled - [*.md] trim_trailing_whitespace = false [*.{json,yaml,yml}] -indent_style = space indent_size = 2 diff --git a/README.md b/README.md index 022603d8..cdcc67ab 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ repositories { } dependencies { - implementation("org.radarbase:radar-commons:1.1.0") + implementation("org.radarbase:radar-commons:1.1.1") } ``` @@ -62,7 +62,7 @@ repositories { } dependencies { - implementation("org.radarbase:radar-commons-server:1.1.0") + implementation("org.radarbase:radar-commons-server:1.1.1") } ``` @@ -75,7 +75,7 @@ repositories { } dependencies { - testImplementation("org.radarbase:radar-commons-testing:1.1.0") + testImplementation("org.radarbase:radar-commons-testing:1.1.1") } ``` @@ -102,7 +102,7 @@ configurations.all { } dependencies { - implementation("org.radarbase:radar-commons:1.1.1-SNAPSHOT") + implementation("org.radarbase:radar-commons:1.1.2-SNAPSHOT") } ``` diff --git a/build.gradle.kts b/build.gradle.kts index 32770a66..eee21afd 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,6 +18,7 @@ import org.radarbase.gradle.plugin.radarPublishing */ plugins { kotlin("plugin.serialization") version Versions.Plugins.kotlinSerialization apply false + kotlin("plugin.allopen") version Versions.Plugins.kotlinAllOpen apply false id("com.github.davidmc24.gradle.plugin.avro") version Versions.Plugins.avro apply false id("org.radarbase.radar-root-project") id("org.radarbase.radar-dependency-management") diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index d89e3e7b..e3313772 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,9 +1,11 @@ +@Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate") object Versions { - const val project = "1.1.0" + const val project = "1.1.1" object Plugins { const val kotlin = "1.9.10" const val kotlinSerialization = kotlin + const val kotlinAllOpen = kotlin const val avro = "1.8.0" const val gradle = "8.3" } @@ -12,7 +14,7 @@ object Versions { const val slf4j = "2.0.9" const val confluent = "7.5.0" const val kafka = "7.5.0-ce" - const val avro = "1.11.2" + const val avro = "1.11.3" const val jackson = "2.15.2" const val okhttp = "4.11.0" const val junit = "5.10.0" @@ -23,4 +25,6 @@ object Versions { const val opencsv = "5.8" const val ktor = "2.3.4" const val coroutines = "1.7.3" + const val commonsCompress = "1.24.0" + const val snappy = "1.1.10.5" } diff --git a/gradle/publishing.gradle b/gradle/publishing.gradle deleted file mode 100644 index 2b8935dd..00000000 --- a/gradle/publishing.gradle +++ /dev/null @@ -1,86 +0,0 @@ -apply plugin: 'maven-publish' -apply plugin: 'signing' - -def sharedManifest = manifest { - attributes("Implementation-Title": project.name, - "Implementation-Version": version) -} - -jar { - manifest.from sharedManifest -} - -// custom tasks for creating source/javadoc jars -task sourcesJar(type: Jar, dependsOn: classes) { - archiveClassifier.set('sources') - from sourceSets.main.allSource - manifest.from sharedManifest -} - -task javadocJar(type: Jar, dependsOn: javadoc) { - archiveClassifier.set('javadoc') - from javadoc.destinationDir - manifest.from sharedManifest -} - -assemble.dependsOn(javadocJar, sourcesJar) - -publishing { - publications { - mavenJar(MavenPublication) { - from components.java - artifact sourcesJar - artifact javadocJar - - pom { - name = project.name - description = project.description - url = githubUrl - licenses { - license { - name = 'The Apache Software License, Version 2.0' - url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' - distribution = 'repo' - } - } - developers { - developer { - id = 'blootsvoets' - name = 'Joris Borgdorff' - email = 'joris@thehyve.nl' - organization = 'The Hyve' - } - developer { - id = 'nivemaham' - name = 'Nivethika Mahasivam' - email = 'nivethika@thehyve.nl' - organization = 'The Hyve' - } - } - issueManagement { - system = 'GitHub' - url = githubUrl + '/issues' - } - organization { - name = 'RADAR-base' - url = website - } - scm { - connection = 'scm:git:' + githubUrl - url = githubUrl - } - } - } - } -} - -signing { - useGpgCmd() - required { true } - sign(tasks["sourcesJar"], tasks["javadocJar"]) - sign(publishing.publications["mavenJar"]) -} - -tasks.withType(Sign).configureEach { - onlyIf { gradle.taskGraph.hasTask(project.tasks["publish"]) } -} diff --git a/radar-commons-gradle/build.gradle.kts b/radar-commons-gradle/build.gradle.kts index 80e1f9f1..14f6c16b 100644 --- a/radar-commons-gradle/build.gradle.kts +++ b/radar-commons-gradle/build.gradle.kts @@ -11,7 +11,7 @@ plugins { signing } -version = "1.1.0" +version = "1.1.1" group = "org.radarbase" description = "RADAR-base common Gradle plugin setup" @@ -23,11 +23,11 @@ repositories { } dependencies { - implementation("org.jetbrains.kotlin:kotlin-gradle-plugin:1.9.0") + implementation("org.jetbrains.kotlin:kotlin-gradle-plugin:1.9.10") implementation("org.jetbrains.dokka:dokka-gradle-plugin:1.9.0") - implementation("com.github.ben-manes:gradle-versions-plugin:0.47.0") + implementation("com.github.ben-manes:gradle-versions-plugin:0.48.0") implementation("io.github.gradle-nexus:publish-plugin:2.0.0-rc-1") - implementation("org.jlleitschuh.gradle:ktlint-gradle:11.5.1") + implementation("org.jlleitschuh.gradle:ktlint-gradle:11.6.0") implementation("com.github.jk1.dependency-license-report:com.github.jk1.dependency-license-report.gradle.plugin:2.5") } diff --git a/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarKotlinPlugin.kt b/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarKotlinPlugin.kt index a22b4a89..b3c4f059 100644 --- a/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarKotlinPlugin.kt +++ b/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarKotlinPlugin.kt @@ -12,7 +12,18 @@ import org.gradle.api.tasks.bundling.Tar import org.gradle.api.tasks.compile.JavaCompile import org.gradle.api.tasks.testing.Test import org.gradle.api.tasks.testing.logging.TestExceptionFormat -import org.gradle.kotlin.dsl.* +import org.gradle.kotlin.dsl.apply +import org.gradle.kotlin.dsl.configure +import org.gradle.kotlin.dsl.create +import org.gradle.kotlin.dsl.dependencies +import org.gradle.kotlin.dsl.get +import org.gradle.kotlin.dsl.getValue +import org.gradle.kotlin.dsl.hasPlugin +import org.gradle.kotlin.dsl.maven +import org.gradle.kotlin.dsl.provideDelegate +import org.gradle.kotlin.dsl.register +import org.gradle.kotlin.dsl.repositories +import org.gradle.kotlin.dsl.withType import org.jetbrains.kotlin.gradle.dsl.JvmTarget import org.jetbrains.kotlin.gradle.dsl.KotlinVersion import org.jetbrains.kotlin.gradle.tasks.KotlinCompile @@ -109,6 +120,10 @@ class RadarKotlinPlugin : Plugin { useJUnitPlatform() } + tasks.withType { + compression = Compression.GZIP + archiveExtension.set("tar.gz") + } tasks.register("downloadDependencies") { doFirst { diff --git a/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarPublishingPlugin.kt b/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarPublishingPlugin.kt index 488606c0..7ccfdd3d 100644 --- a/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarPublishingPlugin.kt +++ b/radar-commons-gradle/src/main/kotlin/org/radarbase/gradle/plugin/RadarPublishingPlugin.kt @@ -9,10 +9,19 @@ import org.gradle.api.publish.maven.MavenPomDeveloperSpec import org.gradle.api.publish.maven.MavenPublication import org.gradle.api.publish.maven.plugins.MavenPublishPlugin import org.gradle.api.tasks.SourceSetContainer -import org.gradle.api.tasks.bundling.Compression import org.gradle.api.tasks.bundling.Jar -import org.gradle.api.tasks.bundling.Tar -import org.gradle.kotlin.dsl.* +import org.gradle.kotlin.dsl.apply +import org.gradle.kotlin.dsl.attributes +import org.gradle.kotlin.dsl.configure +import org.gradle.kotlin.dsl.create +import org.gradle.kotlin.dsl.creating +import org.gradle.kotlin.dsl.get +import org.gradle.kotlin.dsl.getByName +import org.gradle.kotlin.dsl.getValue +import org.gradle.kotlin.dsl.provideDelegate +import org.gradle.kotlin.dsl.registering +import org.gradle.kotlin.dsl.the +import org.gradle.kotlin.dsl.withType import org.gradle.plugins.signing.Sign import org.gradle.plugins.signing.SigningExtension import org.gradle.plugins.signing.SigningPlugin @@ -52,11 +61,6 @@ class RadarPublishingPlugin : Plugin { dependsOn(dokkaJavadoc) } - tasks.withType { - compression = Compression.GZIP - archiveExtension.set("tar.gz") - } - tasks.withType { manifest { attributes( diff --git a/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/CachedValue.kt b/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/CachedValue.kt index 109068aa..9ce673ec 100644 --- a/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/CachedValue.kt +++ b/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/CachedValue.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.sync.Semaphore import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.coroutineContext import kotlin.time.Duration -import kotlin.time.ExperimentalTime import kotlin.time.TimeMark import kotlin.time.TimeSource @@ -207,17 +206,11 @@ open class CachedValue( cache.set(null) } - sealed class CacheContents - @ExperimentalTime - constructor(time: TimeMark?) { - - @OptIn(ExperimentalTime::class) + sealed class CacheContents(time: TimeMark?) { constructor() : this(null) - @ExperimentalTime protected val time: TimeMark = time ?: TimeSource.Monotonic.markNow() - @OptIn(ExperimentalTime::class) open fun isExpired(age: Duration): Boolean = (time + age).hasPassedNow() abstract fun getOrThrow(): T @@ -229,27 +222,25 @@ open class CachedValue( internal constructor( val exception: Throwable, ) : CacheContents() { - override fun isExpired(age: Duration): Boolean = exception is CancellationException || super.isExpired(age) + override fun isExpired(age: Duration): Boolean = exception is CancellationException || + super.isExpired(age) + override fun getOrThrow(): T = throw exception @Suppress("UNCHECKED_CAST") override suspend fun map(transform: suspend (T) -> R): CacheContents = this as CacheError } - @OptIn(ExperimentalTime::class) class CacheValue - @ExperimentalTime internal constructor( val value: T, time: TimeMark?, ) : CacheContents(time) { - @OptIn(ExperimentalTime::class) constructor(value: T) : this(value, null) override fun getOrThrow(): T = value - @OptIn(ExperimentalTime::class) override suspend fun map(transform: suspend (T) -> R): CacheContents = try { CacheValue(transform(value), time = time) } catch (ex: Throwable) { diff --git a/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/Extensions.kt b/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/Extensions.kt index e3f48ca7..0329c62b 100644 --- a/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/Extensions.kt +++ b/radar-commons-kotlin/src/main/kotlin/org/radarbase/kotlin/coroutines/Extensions.kt @@ -2,10 +2,17 @@ package org.radarbase.kotlin.coroutines -import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.consume +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.withContext import java.util.concurrent.Future import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext diff --git a/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/Extensions.kt b/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/Extensions.kt index c48bab65..2831edbb 100644 --- a/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/Extensions.kt +++ b/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/Extensions.kt @@ -1,7 +1,7 @@ package org.radarbase.ktor.auth -import io.ktor.client.request.* -import io.ktor.http.* +import io.ktor.client.request.HttpRequestBuilder +import io.ktor.http.HttpHeaders fun HttpRequestBuilder.bearer(token: String) { headers[HttpHeaders.Authorization] = "Bearer $token" diff --git a/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/OAuthClientProvider.kt b/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/OAuthClientProvider.kt index 785096e5..9a2fd00b 100644 --- a/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/OAuthClientProvider.kt +++ b/radar-commons-kotlin/src/main/kotlin/org/radarbase/ktor/auth/OAuthClientProvider.kt @@ -1,15 +1,25 @@ package org.radarbase.ktor.auth -import io.ktor.client.* -import io.ktor.client.call.* -import io.ktor.client.plugins.auth.* -import io.ktor.client.plugins.auth.providers.* -import io.ktor.client.request.* -import io.ktor.client.request.forms.* -import io.ktor.client.statement.* -import io.ktor.http.* -import io.ktor.http.auth.* -import io.ktor.util.* +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.plugins.auth.Auth +import io.ktor.client.plugins.auth.AuthProvider +import io.ktor.client.plugins.auth.providers.BearerAuthConfig +import io.ktor.client.plugins.auth.providers.BearerAuthProvider +import io.ktor.client.request.HttpRequestBuilder +import io.ktor.client.request.accept +import io.ktor.client.request.forms.submitForm +import io.ktor.client.request.headers +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.bodyAsText +import io.ktor.http.ContentType +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpStatusCode +import io.ktor.http.Parameters +import io.ktor.http.auth.AuthScheme +import io.ktor.http.auth.HttpAuthHeader +import io.ktor.http.isSuccess +import io.ktor.util.KtorDsl import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import org.slf4j.LoggerFactory diff --git a/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/CachedValueTest.kt b/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/CachedValueTest.kt index bbb90c14..bfc91508 100644 --- a/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/CachedValueTest.kt +++ b/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/CachedValueTest.kt @@ -5,7 +5,9 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.Matchers.* +import org.hamcrest.Matchers.equalTo +import org.hamcrest.Matchers.`is` +import org.hamcrest.Matchers.lessThan import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows diff --git a/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/ExtensionsKtTest.kt b/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/ExtensionsKtTest.kt index bb709753..cf563dba 100644 --- a/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/ExtensionsKtTest.kt +++ b/radar-commons-kotlin/src/test/kotlin/org/radarbase/kotlin/coroutines/ExtensionsKtTest.kt @@ -1,19 +1,25 @@ package org.radarbase.kotlin.coroutines -import kotlinx.coroutines.* +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import org.hamcrest.MatcherAssert.assertThat import org.hamcrest.Matchers.greaterThan import org.hamcrest.Matchers.lessThan -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.fail import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.ExperimentalTime import kotlin.time.measureTime -@OptIn(ExperimentalTime::class) class ExtensionsKtTest { companion object { @BeforeAll diff --git a/radar-commons-server/build.gradle.kts b/radar-commons-server/build.gradle.kts index 010ae498..b8b03ea6 100644 --- a/radar-commons-server/build.gradle.kts +++ b/radar-commons-server/build.gradle.kts @@ -16,6 +16,7 @@ plugins { id("com.github.davidmc24.gradle.plugin.avro") + kotlin("plugin.allopen") } description = "RADAR Common server library utilities." @@ -40,10 +41,16 @@ dependencies { api("org.apache.avro:avro:${Versions.avro}") - implementation("org.apache.kafka:kafka-clients:${Versions.kafka}") + implementation("org.apache.kafka:kafka-clients:${Versions.kafka}") { + implementation("org.xerial.snappy:snappy-java:${Versions.snappy}") + } testImplementation("org.mockito:mockito-core:${Versions.mockito}") // Direct producer uses KafkaAvroSerializer if initialized testImplementation("io.confluent:kafka-avro-serializer:${Versions.confluent}") testImplementation("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}") } + +allOpen { + annotation("org.radarbase.config.OpenConfig") +} diff --git a/radar-commons-server/src/main/java/org/radarbase/config/AvroTopicConfig.kt b/radar-commons-server/src/main/java/org/radarbase/config/AvroTopicConfig.kt index 0983d4a5..7314002b 100644 --- a/radar-commons-server/src/main/java/org/radarbase/config/AvroTopicConfig.kt +++ b/radar-commons-server/src/main/java/org/radarbase/config/AvroTopicConfig.kt @@ -23,15 +23,13 @@ import org.radarbase.topic.AvroTopic.Companion.parse /** * Specifies an Avro topic. */ -open class AvroTopicConfig { - @JvmField +@OpenConfig +class AvroTopicConfig { var topic: String? = null - @JvmField @JsonProperty("key_schema") var keySchema: String? = null - @JvmField @JsonProperty("value_schema") var valueSchema: String? = null var tags: List? = null @@ -42,7 +40,7 @@ open class AvroTopicConfig { * @throws IllegalStateException if the key_schema or value_schema properties are not valid * Avro SpecificRecord classes */ - open fun parseAvroTopic(): AvroTopic { + fun parseAvroTopic(): AvroTopic { return try { parse( checkNotNull(topic) { "Topic is not specified" }, diff --git a/radar-commons-server/src/main/java/org/radarbase/stream/collector/NumericAggregateCollector.kt b/radar-commons-server/src/main/java/org/radarbase/stream/collector/NumericAggregateCollector.kt index 904b3007..2062a41a 100644 --- a/radar-commons-server/src/main/java/org/radarbase/stream/collector/NumericAggregateCollector.kt +++ b/radar-commons-server/src/main/java/org/radarbase/stream/collector/NumericAggregateCollector.kt @@ -23,7 +23,7 @@ import java.math.BigDecimal import java.math.BigDecimal.valueOf import java.math.BigInteger import java.nio.ByteBuffer -import java.util.* +import java.util.Objects /** * Java class to aggregate data using Kafka Streams. Double is the base type. diff --git a/radar-commons-testing/build.gradle.kts b/radar-commons-testing/build.gradle.kts index a1513bcc..4c14246e 100644 --- a/radar-commons-testing/build.gradle.kts +++ b/radar-commons-testing/build.gradle.kts @@ -47,7 +47,9 @@ dependencies { implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}")) implementation("com.fasterxml.jackson.core:jackson-databind") - implementation("org.apache.kafka:kafka-clients:${Versions.kafka}") + implementation("org.apache.kafka:kafka-clients:${Versions.kafka}") { + implementation("org.xerial.snappy:snappy-java:${Versions.snappy}") + } implementation("io.confluent:kafka-avro-serializer:${Versions.confluent}") implementation(platform("io.ktor:ktor-bom:${Versions.ktor}")) diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/MockProducer.kt b/radar-commons-testing/src/main/java/org/radarbase/mock/MockProducer.kt index 193fd371..78eebe14 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/MockProducer.kt +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/MockProducer.kt @@ -16,13 +16,15 @@ package org.radarbase.mock import com.opencsv.exceptions.CsvValidationException -import io.ktor.client.call.* -import io.ktor.client.plugins.* -import io.ktor.client.plugins.auth.* -import io.ktor.client.plugins.auth.providers.* -import io.ktor.client.request.forms.* -import io.ktor.http.* -import kotlinx.coroutines.* +import io.ktor.client.plugins.auth.Auth +import io.ktor.client.plugins.defaultRequest +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import org.apache.avro.SchemaValidationException import org.radarbase.config.ServerConfig import org.radarbase.config.YamlConfigLoader @@ -41,13 +43,17 @@ import org.radarbase.producer.rest.RestKafkaSender.Companion.restKafkaSender import org.radarbase.producer.schema.SchemaRetriever import org.radarbase.producer.schema.SchemaRetriever.Companion.schemaRetriever import org.radarcns.kafka.ObservationKey -import org.radarcns.passive.empatica.* +import org.radarcns.passive.empatica.EmpaticaE4Acceleration +import org.radarcns.passive.empatica.EmpaticaE4BatteryLevel +import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse +import org.radarcns.passive.empatica.EmpaticaE4ElectroDermalActivity +import org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval +import org.radarcns.passive.empatica.EmpaticaE4Temperature import org.slf4j.LoggerFactory import java.io.IOException import java.nio.file.Path import java.nio.file.Paths import java.time.Instant -import java.util.* import java.util.concurrent.atomic.AtomicBoolean import kotlin.system.exitProcess import kotlin.time.Duration.Companion.seconds diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/config/MockDataConfig.kt b/radar-commons-testing/src/main/java/org/radarbase/mock/config/MockDataConfig.kt index 979e866b..71cb6dbc 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/config/MockDataConfig.kt +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/config/MockDataConfig.kt @@ -24,15 +24,12 @@ import java.nio.file.Path import java.nio.file.Paths class MockDataConfig : AvroTopicConfig() { - @JvmField @JsonProperty("file") var dataFile: String? = null - @JvmField var frequency = 1 var sensor: String? = null - @JvmField @JsonProperty("value_fields") var valueFields: List? = null var absoluteDataFile: String? = null diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/data/CsvGenerator.java b/radar-commons-testing/src/main/java/org/radarbase/mock/data/CsvGenerator.kt similarity index 52% rename from radar-commons-testing/src/main/java/org/radarbase/mock/data/CsvGenerator.java rename to radar-commons-testing/src/main/java/org/radarbase/mock/data/CsvGenerator.kt index 0ce4c0ab..1e958694 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/data/CsvGenerator.java +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/data/CsvGenerator.kt @@ -13,34 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.radarbase.mock.data -package org.radarbase.mock.data; - -import com.opencsv.CSVWriter; -import java.io.IOException; -import java.io.Writer; -import java.nio.file.Files; -import java.nio.file.Path; -import org.radarbase.mock.config.MockDataConfig; -import org.radarcns.kafka.ObservationKey; +import com.opencsv.CSVWriter +import org.radarbase.mock.config.MockDataConfig +import org.radarcns.kafka.ObservationKey +import java.io.IOException +import java.nio.file.Path +import kotlin.io.path.bufferedWriter /** * It generates a CVS file that can be used to stream data and * to compute the expected results. + * @param key record key, project test, user UserID_0 and source SourceID_0 by default. */ -public final class CsvGenerator { - private final ObservationKey key; - - /** CsvGenerator sending data as project test, user UserID_0 and source SourceID_0. */ - public CsvGenerator() { - this(new ObservationKey("test", "UserID_0", "SourceID_0")); - } - - /** CsvGenerator sending data with given key. */ - public CsvGenerator(ObservationKey key) { - this.key = key; - } - +class CsvGenerator( + private val key: ObservationKey = ObservationKey("test", "UserID_0", "SourceID_0"), +) { + /** CsvGenerator sending data with given key. */ /** * Generates new CSV file to simulation a single user with a single device. * @@ -49,11 +39,10 @@ public CsvGenerator(ObservationKey key) { * @param root directory relative to which the output csv file is generated * @throws IOException if the CSV file cannot be written to */ - public void generate(MockDataConfig config, long duration, Path root) - throws IOException { - Path file = config.getDataFile(root); - - generate(new RecordGenerator<>(config, ObservationKey.class), duration, file); + @Throws(IOException::class) + fun generate(config: MockDataConfig, duration: Long, root: Path) { + val file = config.getDataFile(root) + generate(RecordGenerator(config, ObservationKey::class.java), duration, file) } /** @@ -64,12 +53,13 @@ public void generate(MockDataConfig config, long duration, Path root) * @param csvFile CSV file to write data to * @throws IOException if the CSV file cannot be written to */ - public void generate(RecordGenerator generator, long duration, Path csvFile) - throws IOException { - try (Writer writer = Files.newBufferedWriter(csvFile); - CSVWriter csvWriter = new CSVWriter(writer)) { - csvWriter.writeNext(generator.getHeader().toArray(new String[0])); - csvWriter.writeAll(generator.iteratableRawValues(key, duration)); + @Throws(IOException::class) + fun generate(generator: RecordGenerator, duration: Long, csvFile: Path) { + csvFile.bufferedWriter().use { writer -> + CSVWriter(writer).use { csvWriter -> + csvWriter.writeNext(generator.headerArray) + csvWriter.writeAll(generator.iteratableRawValues(key, duration)) + } } } } diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/data/MockCsvParser.kt b/radar-commons-testing/src/main/java/org/radarbase/mock/data/MockCsvParser.kt index 2c68a6a0..457bbc2a 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/data/MockCsvParser.kt +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/data/MockCsvParser.kt @@ -35,7 +35,8 @@ import java.nio.charset.StandardCharsets import java.nio.file.Path import java.time.Duration import java.time.Instant -import java.util.* +import kotlin.io.encoding.Base64 +import kotlin.io.encoding.ExperimentalEncodingApi import kotlin.io.path.bufferedReader /** @@ -277,9 +278,9 @@ class MockCsvParser( } } + @OptIn(ExperimentalEncodingApi::class) private fun parseBytes(fieldString: String?): ByteBuffer { - val result = Base64.getDecoder() - .decode(fieldString!!.toByteArray(StandardCharsets.UTF_8)) + val result = Base64.decode(fieldString!!.toByteArray(StandardCharsets.UTF_8)) return ByteBuffer.wrap(result) } diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.java b/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.java deleted file mode 100644 index 36129b94..00000000 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarbase.mock.data; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.specific.SpecificRecord; -import org.radarbase.data.Record; -import org.radarbase.mock.config.MockDataConfig; -import org.radarbase.topic.AvroTopic; -import org.radarbase.util.Metronome; - -/** - * Generates records according to the specification in a {@link MockDataConfig}. - * - * @param type of key to generate - */ -public class RecordGenerator { - private static final Set ACCEPTABLE_VALUE_TYPES = new HashSet<>(Arrays.asList(Type.DOUBLE, - Type.FLOAT, Type.INT, Type.LONG, Type.ENUM)); - private final AvroTopic topic; - private final Field timeField; - private final Field timeReceivedField; - private final List valueFields; - private final MockDataConfig config; - private final List unknownFields; - private final List header; - - /** - * Generates records according to config. Given key class must match the one specified in the - * config. - * @param config configuration to use - */ - public RecordGenerator(MockDataConfig config, Class keyClass) { - this.config = config; - - // doing type checking below. - topic = config.parseAvroTopic(); - if (!topic.getKeyClass().equals(keyClass)) { - throw new IllegalArgumentException( - "RecordGenerator only generates ObservationKey keys, not " - + topic.getKeyClass() + " in topic " + topic); - } - if (!SpecificRecord.class.isAssignableFrom(topic.getValueClass())) { - throw new IllegalArgumentException( - "RecordGenerator only generates SpecificRecord values, not " - + topic.getValueClass() + " in topic " + topic); - } - header = new ArrayList<>(); - header.addAll(Arrays.asList("projectId", "userId", "sourceId")); - - // cache key and value fields - Schema valueSchema = topic.getValueSchema(); - - timeField = forceGetField(valueSchema, "time"); - timeReceivedField = valueSchema.getField("timeReceived"); - - List valueFieldNames = config.valueFields; - if (valueFieldNames == null) { - valueFieldNames = Collections.emptyList(); - } - valueFields = new ArrayList<>(valueFieldNames.size()); - for (String fieldName : valueFieldNames) { - Field field = forceGetField(valueSchema, fieldName); - valueFields.add(field); - Schema.Type type = field.schema().getType(); - if (!ACCEPTABLE_VALUE_TYPES.contains(type)) { - throw new IllegalArgumentException("Cannot generate data for type " + type - + " in field " + fieldName + " in topic " + topic); - } - } - - unknownFields = new ArrayList<>(valueSchema.getFields().size() - valueFields.size() - 2); - for (Field field : valueSchema.getFields()) { - header.add(field.name()); - if (field.name().equals("time") || field.name().equals("timeReceived") - || valueFieldNames.contains(field.name())) { - continue; - } - unknownFields.add(field); - } - } - - /** - * Get the header with correct prefixes. - * @return generated header - */ - public List getHeader() { - List withPrefix = new ArrayList<>(header); - for (int i = 0; i < 3; i++) { - withPrefix.set(i, "key." + withPrefix.get(i)); - } - for (int i = 3; i < header.size(); i++) { - withPrefix.set(i, "value." + withPrefix.get(i)); - } - return withPrefix; - } - - /** Get given schema field, and throw an IllegalArgumentException if it does not exists. */ - private Field forceGetField(Schema schema, String name) { - Field field = schema.getField(name); - if (field == null) { - throw new IllegalArgumentException("Schema for topic " + topic + " does not contain " - + "required field " + name); - } - return field; - } - - /** - * Simulates data of a sensor with the given frequency for a time interval specified by - * duration. The data is converted to lists of strings. - * @param duration in milliseconds for the simulation - * @param key key to generate data with - * @return list containing simulated values - */ - public Iterable iteratableRawValues(K key, long duration) { - return () -> { - final Iterator> baseIterator = iterateValues(key, duration); - return new RecordArrayIterator<>(baseIterator); - }; - } - - /** - * Simulates data of a sensor with the given frequency for a time interval specified by - * duration. - * @param duration in milliseconds for the simulation - * @param key key to generate data with - * @return list containing simulated values - */ - public Iterator> iterateValues(final K key, final long duration) { - return new RecordIterator(duration, key); - } - - /** - * Get a random double. - * @return random {@code Double} using {@code ThreadLocalRandom}. - **/ - private double getRandomDouble() { - return ThreadLocalRandom.current().nextDouble(config.minimum, config.maximum); - } - - /** - * It returns the time a message is received. - * @param time time at which the message has been sent - * @return random {@code Double} representing the Round Trip Time for the given timestamp - * using {@code ThreadLocalRandom} - **/ - private long getTimeReceived(long time) { - return time + ThreadLocalRandom.current().nextLong(1 , 10); - } - - private static class RecordArrayIterator - implements Iterator { - - private final Iterator> baseIterator; - - private RecordArrayIterator(Iterator> baseIterator) { - this.baseIterator = baseIterator; - } - - @Override - public boolean hasNext() { - return baseIterator.hasNext(); - } - - @Override - public String[] next() { - Record record = baseIterator.next(); - - int keyFieldsSize = record.getKey().getSchema().getFields().size(); - int valueFieldsSize = record.getValue().getSchema().getFields().size(); - - String[] result = new String[keyFieldsSize + valueFieldsSize]; - for (int i = 0; i < keyFieldsSize; i++) { - result[i] = record.getKey().get(i).toString(); - } - for (int i = 0; i < valueFieldsSize; i++) { - result[i + keyFieldsSize] = record.getValue().get(i).toString(); - } - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - } - - public MockDataConfig getConfig() { - return config; - } - - public AvroTopic getTopic() { - return topic; - } - - private class RecordIterator implements Iterator> { - private final Metronome timestamps; - private final K key; - - public RecordIterator(long duration, K key) { - this.key = key; - timestamps = new Metronome(duration * config.frequency / 1000L, - config.frequency); - } - - @Override - public boolean hasNext() { - return timestamps.hasNext(); - } - - @Override - public Record next() { - if (!hasNext()) { - throw new IllegalStateException("Iterator done"); - } - SpecificRecord value = topic.newValueInstance(); - long time = timestamps.next(); - - value.put(timeField.pos(), time / 1000d); - if (timeReceivedField != null) { - value.put(timeReceivedField.pos(), getTimeReceived(time) / 1000d); - } - - for (Field f : valueFields) { - Type type = f.schema().getType(); - Object fieldValue; - switch (type) { - case DOUBLE: - fieldValue = getRandomDouble(); - break; - case FLOAT: - fieldValue = (float)getRandomDouble(); - break; - case LONG: - fieldValue = (long)getRandomDouble(); - break; - case INT: - fieldValue = (int)getRandomDouble(); - break; - case ENUM: - fieldValue = getRandomEnum(f.schema()); - break; - default: - throw new IllegalStateException("Cannot parse type " + type); - - } - value.put(f.pos(), fieldValue); - } - - for (Field f : unknownFields) { - value.put(f.pos(), f.defaultVal()); - } - - return new Record<>(key, value); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - private static Object getRandomEnum(Schema schema) { - try { - Class cls = Class.forName(schema.getFullName()); - Method values = cls.getMethod("values"); - Object[] symbols = (Object[]) values.invoke(null); - int symbolIndex = ThreadLocalRandom.current().nextInt(symbols.length); - return symbols[symbolIndex]; - } catch (ReflectiveOperationException | ClassCastException e) { - throw new IllegalArgumentException( - "Cannot generate random enum class " + schema.getFullName(), e); - } - } -} diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt b/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt new file mode 100644 index 00000000..5e0b8cea --- /dev/null +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt @@ -0,0 +1,212 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.radarbase.mock.data + +import org.apache.avro.Schema +import org.apache.avro.Schema.Field +import org.apache.avro.Schema.Type +import org.apache.avro.Schema.Type.DOUBLE +import org.apache.avro.Schema.Type.ENUM +import org.apache.avro.Schema.Type.FLOAT +import org.apache.avro.Schema.Type.INT +import org.apache.avro.Schema.Type.LONG +import org.apache.avro.specific.SpecificRecord +import org.radarbase.data.Record +import org.radarbase.mock.config.MockDataConfig +import org.radarbase.topic.AvroTopic +import org.radarbase.util.Metronome +import java.util.EnumSet +import java.util.concurrent.ThreadLocalRandom +import kotlin.random.Random + +/** + * Generates records according to the specification in a [MockDataConfig]. + * Given key class must match the one specified in the config. + * + * @param K type of key to generate + * @param config configuration to use + */ +open class RecordGenerator( + val config: MockDataConfig, + keyClass: Class, +) { + val topic: AvroTopic = config.parseAvroTopic() + private val timeField: Field + private val timeReceivedField: Field? + private val valueFields: List + private val unknownFields: List + private val header: List + + init { + // doing type checking below. + require(topic.keyClass == keyClass) { + "RecordGenerator only generates ObservationKey keys, not ${topic.keyClass} in topic $topic" + } + require(SpecificRecord::class.java.isAssignableFrom(topic.valueClass)) { + "RecordGenerator only generates SpecificRecord values, not ${topic.valueClass} in topic $topic" + } + header = ArrayList() + header.addAll(mutableListOf("projectId", "userId", "sourceId")) + + // cache key and value fields + val valueSchema = topic.valueSchema + timeField = forceGetField(valueSchema, "time") + timeReceivedField = valueSchema.getField("timeReceived") + + val valueFieldNames = config.valueFields ?: emptyList() + valueFields = valueFieldNames + .map { fieldName -> forceGetField(valueSchema, fieldName) } + .onEach { field -> + val type = field.schema().type + require(ACCEPTABLE_VALUE_TYPES.contains(type)) { + "Cannot generate data for type $type in field ${field.name()} in topic $topic" + } + } + + unknownFields = ArrayList(valueSchema.fields.size - valueFields.size - 2) + val existingNames = buildSet(valueFieldNames.size + 2) { + add("time") + add("timeReceived") + addAll(valueFieldNames) + } + for (field in valueSchema.fields) { + header.add(field.name()) + if (field.name() !in existingNames) { + unknownFields.add(field) + } + } + } + + /** + * Get the header with correct prefixes. + * @return generated header + */ + val headerArray: Array = Array(header.size) { idx -> + val name = header[idx] + if (idx <= 2) { + "key.$name" + } else { + "value.$name" + } + } + + /** Get given schema field, and throw an IllegalArgumentException if it does not exists. */ + private fun forceGetField( + schema: Schema, + name: String, + ): Field = requireNotNull(schema.getField(name)) { + "Schema for topic $topic does not contain required field $name" + } + + /** + * Simulates data of a sensor with the given frequency for a time interval specified by + * duration. The data is converted to lists of strings. + * @param duration in milliseconds for the simulation + * @param key key to generate data with + * @return list containing simulated values + */ + open fun iteratableRawValues(key: K, duration: Long): Iterable> = iterateValues(key, duration) + .asSequence() + .map { record -> + val keyFieldsSize = record.key.schema.fields.size + val valueFieldsSize = record.value.schema.fields.size + Array(keyFieldsSize + valueFieldsSize) { idx -> + if (idx < keyFieldsSize) { + record.key[idx] + } else { + record.value[idx - keyFieldsSize] + }.toString() + } + } + .asIterable() + + /** + * Simulates data of a sensor with the given frequency for a time interval specified by + * duration. + * @param duration in milliseconds for the simulation + * @param key key to generate data with + * @return list containing simulated values + */ + fun iterateValues(key: K, duration: Long): Iterator> = Metronome( + duration * config.frequency / 1000L, + config.frequency, + ) + .asSequence() + .map { time -> Record(key, createValue(time)) } + .iterator() + + private fun createValue(time: Long) = topic.newValueInstance().apply { + put(timeField.pos(), time / 1000.0) + if (timeReceivedField != null) { + put(timeReceivedField.pos(), getTimeReceived(time) / 1000.0) + } + for (f in valueFields) { + val fieldValue: Any = when (val type = f.schema().type) { + DOUBLE -> randomDouble + FLOAT -> randomDouble.toFloat() + LONG -> randomDouble.toLong() + INT -> randomDouble.toInt() + ENUM -> getRandomEnum(f.schema()) + else -> throw IllegalStateException("Cannot parse type $type") + } + put(f.pos(), fieldValue) + } + for (f in unknownFields) { + put(f.pos(), f.defaultVal()) + } + } + + /** + * Get a random double. + * @return random `Double` using `ThreadLocalRandom`. + */ + private val randomDouble: Double + get() = Random.nextDouble(config.minimum, config.maximum) + + /** + * It returns the time a message is received. + * @param time time at which the message has been sent + * @return random `Double` representing the Round Trip Time for the given timestamp + * using `ThreadLocalRandom` + */ + private fun getTimeReceived(time: Long): Long = time + Random.nextLong(1, 10) + + companion object { + private val ACCEPTABLE_VALUE_TYPES: Set = EnumSet.of(DOUBLE, FLOAT, INT, LONG, ENUM) + + private fun getRandomEnum(schema: Schema): Any { + return try { + val cls = Class.forName(schema.fullName) + val values = cls.getMethod("values") + + @Suppress("UNCHECKED_CAST") + val symbols = values.invoke(null) as Array + val symbolIndex = ThreadLocalRandom.current().nextInt(symbols.size) + symbols[symbolIndex] + } catch (e: ReflectiveOperationException) { + throw IllegalArgumentException( + "Cannot generate random enum class " + schema.fullName, + e, + ) + } catch (e: ClassCastException) { + throw IllegalArgumentException( + "Cannot generate random enum class " + schema.fullName, + e, + ) + } + } + } +} diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/model/MockAggregator.java b/radar-commons-testing/src/main/java/org/radarbase/mock/model/MockAggregator.java deleted file mode 100644 index eb2a3a77..00000000 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/model/MockAggregator.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarbase.mock.model; - -import com.opencsv.exceptions.CsvValidationException; -import java.io.IOException; -import java.nio.file.Path; -import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.avro.Schema; -import org.radarbase.mock.config.MockDataConfig; -import org.radarbase.mock.data.MockCsvParser; -import org.radarbase.producer.schema.SchemaRetriever; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The MockAggregator simulates the behaviour of a Kafka Streams application based on time window. - * It supported accumulators are
    - *
  • array of {@code Double} - *
  • singleton {@code Double} - *
- */ -public class MockAggregator { - private static final Logger logger = LoggerFactory.getLogger(MockAggregator.class); - private final List mockDataConfigs; - private final Path root; - private final SchemaRetriever retriever; - - /** - * Default constructor. - */ - public MockAggregator(List mockDataConfigs, Path root, - SchemaRetriever retriever) { - this.mockDataConfigs = mockDataConfigs; - this.root = root; - this.retriever = retriever; - } - - /** - * Simulates all possible test case scenarios configured in mock-configuration. - * - * @return {@code Map} of key {@code MockDataConfig} and value {@code ExpectedValue}. {@link - * ExpectedDoubleValue}. - **/ - @SuppressWarnings({"unused", "rawtypes"}) - public Map simulate() throws IOException { - - Map expectedValue = new HashMap<>(); - - for (MockDataConfig config : mockDataConfigs) { - if (config.valueFields == null || config.valueFields.isEmpty()) { - logger.warn("No value fields specified for {}. Skipping.", config.topic); - continue; - } - - Instant now = Instant.now(); - try (MockCsvParser parser = new MockCsvParser(config, root, now, - retriever)) { - Schema valueSchema = config.parseAvroTopic().getValueSchema(); - List valueFields = config.valueFields; - - ExpectedValue value; - if (config.valueFields.size() == 1) { - value = new ExpectedDoubleValue(valueSchema, valueFields); - } else { - value = new ExpectedArrayValue(valueSchema, valueFields); - } - - while (parser.hasNext()) { - value.add(parser.next()); - } - - expectedValue.put(config, value); - } catch (CsvValidationException ex) { - throw new IOException(ex); - } - } - - return expectedValue; - } -} diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/model/MockAggregator.kt b/radar-commons-testing/src/main/java/org/radarbase/mock/model/MockAggregator.kt new file mode 100644 index 00000000..e2546872 --- /dev/null +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/model/MockAggregator.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.radarbase.mock.model + +import com.opencsv.exceptions.CsvValidationException +import org.apache.avro.specific.SpecificRecord +import org.radarbase.mock.config.MockDataConfig +import org.radarbase.mock.data.MockCsvParser +import org.radarbase.producer.schema.SchemaRetriever +import org.slf4j.LoggerFactory +import java.io.IOException +import java.nio.file.Path +import java.time.Instant + +/** + * The MockAggregator simulates the behaviour of a Kafka Streams application based on time window. + * It supported accumulators are + * * array of `Double` + * * singleton `Double` + * + */ +class MockAggregator( + private val mockDataConfigs: List, + private val root: Path, + private val retriever: SchemaRetriever, +) { + /** + * Simulates all possible test case scenarios configured in mock-configuration. + * + * @return `Map` of key `MockDataConfig` and value `ExpectedValue`. [ ]. + */ + @Suppress("unused") + @Throws(IOException::class) + fun simulate(): Map> = buildMap { + for (config in mockDataConfigs) { + val valueFields = config.valueFields + if (valueFields.isNullOrEmpty()) { + logger.warn("No value fields specified for {}. Skipping.", config.topic) + continue + } + val now = Instant.now() + try { + MockCsvParser(config, root, now, retriever).use { parser -> + val valueSchema = config.parseAvroTopic().valueSchema + val value: ExpectedValue<*> = if (valueFields.size == 1) { + ExpectedDoubleValue(valueSchema, valueFields) + } else { + ExpectedArrayValue(valueSchema, valueFields) + } + while (parser.hasNext()) { + value.add(parser.next()) + } + put(config, value) + } + } catch (ex: CsvValidationException) { + throw IOException(ex) + } + } + } + + companion object { + private val logger = LoggerFactory.getLogger(MockAggregator::class.java) + } +} diff --git a/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt b/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt index 3765d54d..0e832290 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt +++ b/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt @@ -27,7 +27,7 @@ package org.radarbase.util class Metronome( private val samples: Long, private val frequency: Int, -) { +) : AbstractIterator() { private val baseTime: Long private var iteration: Long = 0 @@ -41,14 +41,12 @@ class Metronome( } } - /** Whether the metronome will generate another sample. */ - operator fun hasNext(): Boolean = samples == 0L || iteration < samples - - /** Generate the next sample. */ - operator fun next(): Long { - check(hasNext()) { "Iterator finished" } - - return baseTime + (iteration++).toTimeOffset() + override fun computeNext() { + if (samples != 0L && iteration >= samples) { + done() + } else { + setNext(baseTime + (iteration++).toTimeOffset()) + } } private fun Long.toTimeOffset(): Long = this * 1000 / frequency diff --git a/radar-commons-testing/src/test/java/org/radarbase/mock/CsvGeneratorTest.java b/radar-commons-testing/src/test/java/org/radarbase/mock/CsvGeneratorTest.java deleted file mode 100644 index c28e0171..00000000 --- a/radar-commons-testing/src/test/java/org/radarbase/mock/CsvGeneratorTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarbase.mock; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; - -import com.opencsv.CSVReader; -import com.opencsv.exceptions.CsvValidationException; -import java.io.IOException; -import java.io.Reader; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.radarbase.mock.config.MockDataConfig; -import org.radarbase.mock.data.CsvGenerator; -import org.radarbase.mock.data.MockRecordValidatorTest; -import org.radarbase.mock.data.RecordGenerator; -import org.radarcns.kafka.ObservationKey; - -public class CsvGeneratorTest { - private MockDataConfig makeConfig(Path folder) throws IOException { - return MockRecordValidatorTest.makeConfig(folder); - } - - @Test - public void generateMockConfig(@TempDir Path folder) throws IOException, CsvValidationException { - CsvGenerator generator = new CsvGenerator(); - - MockDataConfig config = makeConfig(folder); - generator.generate(config, 100_000L, folder.getRoot()); - - Path p = Paths.get(config.dataFile); - try (Reader reader = Files.newBufferedReader(p); - CSVReader parser = new CSVReader(reader)) { - String[] headers = {"key.projectId", "key.userId", "key.sourceId", "value.time", "value.timeReceived", "value.light"}; - assertArrayEquals(headers, parser.readNext()); - - int n = 0; - String[] line; - while ((line = parser.readNext()) != null) { - String value = line[5]; - assertNotEquals("NaN", value); - assertNotEquals("Infinity", value); - assertNotEquals("-Infinity", value); - // no decimals lost or appended - assertEquals(value, Float.valueOf(value).toString()); - n++; - } - assertEquals(100, n); - } - } - - @Test - public void generateGenerator(@TempDir Path folder) throws IOException, CsvValidationException { - CsvGenerator generator = new CsvGenerator(); - - MockDataConfig config = makeConfig(folder); - - final String time = Double.toString(System.currentTimeMillis() / 1000d); - - RecordGenerator recordGenerator = new RecordGenerator( - config, ObservationKey.class) { - - @Override - public Iterable iteratableRawValues(ObservationKey key, long duration) { - return List.of(new String[] { - "test", "UserID_0", "SourceID_0", time, time, - Float.valueOf((float)0.123112412410423518).toString() - }); - } - }; - - generator.generate(recordGenerator, 1000L, Paths.get(config.dataFile)); - - Path p = Paths.get(config.dataFile); - - try (Reader reader = Files.newBufferedReader(p); - CSVReader parser = new CSVReader(reader)) { - assertArrayEquals( - recordGenerator.getHeader().toArray(new String[0]), - parser.readNext()); - // float will cut off a lot of decimals - assertArrayEquals( - new String[] { "test", "UserID_0", "SourceID_0", time, time, "0.12311241" }, - parser.readNext()); - } - } -} diff --git a/radar-commons-testing/src/test/java/org/radarbase/mock/CsvGeneratorTest.kt b/radar-commons-testing/src/test/java/org/radarbase/mock/CsvGeneratorTest.kt new file mode 100644 index 00000000..660cc497 --- /dev/null +++ b/radar-commons-testing/src/test/java/org/radarbase/mock/CsvGeneratorTest.kt @@ -0,0 +1,109 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.radarbase.mock + +import com.opencsv.CSVReader +import com.opencsv.exceptions.CsvValidationException +import org.junit.jupiter.api.Assertions.assertArrayEquals +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir +import org.radarbase.mock.config.MockDataConfig +import org.radarbase.mock.data.CsvGenerator +import org.radarbase.mock.data.MockRecordValidatorTest +import org.radarbase.mock.data.RecordGenerator +import org.radarcns.kafka.ObservationKey +import java.io.IOException +import java.nio.file.Path +import java.nio.file.Paths +import kotlin.io.path.bufferedReader + +class CsvGeneratorTest { + @Throws(IOException::class) + private fun makeConfig(folder: Path): MockDataConfig { + return MockRecordValidatorTest.makeConfig(folder) + } + + @Test + @Throws(IOException::class, CsvValidationException::class) + fun generateMockConfig(@TempDir folder: Path) { + val generator = CsvGenerator() + val config = makeConfig(folder) + generator.generate(config, 100000L, folder.root) + val p = Paths.get(checkNotNull(config.dataFile)) + p.bufferedReader().use { reader -> + CSVReader(reader).use { parser -> + val headers = arrayOf( + "key.projectId", + "key.userId", + "key.sourceId", + "value.time", + "value.timeReceived", + "value.light", + ) + assertArrayEquals(headers, parser.readNext()) + val n = generateSequence { parser.readNext() } + .map { line -> + val value = line[5] + assertNotEquals("NaN", value) + assertNotEquals("Infinity", value) + assertNotEquals("-Infinity", value) + // no decimals lost or appended + assertEquals(value, value.toFloat().toString()) + } + .count() + + assertEquals(100, n) + } + } + } + + @Test + @Throws(IOException::class, CsvValidationException::class) + fun generateGenerator(@TempDir folder: Path) { + val generator = CsvGenerator() + val config = makeConfig(folder) + val time = (System.currentTimeMillis() / 1000.0).toString() + val recordGenerator: RecordGenerator = + object : RecordGenerator( + config, + ObservationKey::class.java, + ) { + override fun iteratableRawValues( + key: ObservationKey, + duration: Long, + ): Iterable> = listOf( + arrayOf("test", "UserID_0", "SourceID_0", time, time, 0.12311241241042352.toFloat().toString()), + ) + } + val p = Paths.get(checkNotNull(config.dataFile)) + generator.generate(recordGenerator, 1000L, p) + p.bufferedReader().use { reader -> + CSVReader(reader).use { parser -> + assertArrayEquals( + recordGenerator.headerArray, + parser.readNext(), + ) + // float will cut off a lot of decimals + assertArrayEquals( + arrayOf("test", "UserID_0", "SourceID_0", time, time, "0.12311241"), + parser.readNext(), + ) + } + } + } +} diff --git a/radar-commons-testing/src/test/java/org/radarbase/mock/RecordGeneratorTest.java b/radar-commons-testing/src/test/java/org/radarbase/mock/RecordGeneratorTest.java deleted file mode 100644 index fa6856fc..00000000 --- a/radar-commons-testing/src/test/java/org/radarbase/mock/RecordGeneratorTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2017 The Hyve and King's College London - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.radarbase.mock; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Arrays; -import java.util.Iterator; -import org.apache.avro.specific.SpecificRecord; -import org.junit.jupiter.api.Test; -import org.radarbase.data.Record; -import org.radarbase.mock.config.MockDataConfig; -import org.radarbase.mock.data.RecordGenerator; -import org.radarcns.kafka.ObservationKey; -import org.radarcns.passive.empatica.EmpaticaE4Acceleration; - -/** - * Created by joris on 17/05/2017. - */ -public class RecordGeneratorTest { - - @Test - public void generate() { - MockDataConfig config = new MockDataConfig(); - config.topic = "test"; - config.frequency = 10; - config.minimum = 0.1; - config.maximum = 9.9; - config.valueFields = Arrays.asList("x", "y", "z"); - config.valueSchema = EmpaticaE4Acceleration.class.getName(); - - RecordGenerator generator = new RecordGenerator<>(config, - ObservationKey.class); - Iterator> iter = generator - .iterateValues(new ObservationKey("test", "a", "b"), 0); - Record record = iter.next(); - assertEquals(new ObservationKey("test", "a", "b"), record.getKey()); - float x = ((EmpaticaE4Acceleration)record.getValue()).getX(); - assertTrue(x >= 0.1f && x < 9.9f); - float y = ((EmpaticaE4Acceleration)record.getValue()).getX(); - assertTrue(y >= 0.1f && y < 9.9f); - float z = ((EmpaticaE4Acceleration)record.getValue()).getX(); - assertTrue(z >= 0.1f && z < 9.9f); - double time = ((EmpaticaE4Acceleration)record.getValue()).getTime(); - long now = System.currentTimeMillis(); - assertThat(time, greaterThan(now / 1000d - 1d)); - assertThat(time, lessThanOrEqualTo(now / 1000d)); - - Record nextRecord = iter.next(); - assertEquals(time + 0.1d, (Double)nextRecord.getValue().get(0), 1e-6); - } - - @Test - public void getHeaders() { - MockDataConfig config = new MockDataConfig(); - config.topic = "test"; - config.valueSchema = EmpaticaE4Acceleration.class.getName(); - - RecordGenerator generator = new RecordGenerator<>(config, - ObservationKey.class); - assertEquals( - Arrays.asList("key.projectId", "key.userId", "key.sourceId", - "value.time", "value.timeReceived", "value.x", "value.y", "value.z"), - generator.getHeader()); - } -} diff --git a/radar-commons-testing/src/test/java/org/radarbase/mock/RecordGeneratorTest.kt b/radar-commons-testing/src/test/java/org/radarbase/mock/RecordGeneratorTest.kt new file mode 100644 index 00000000..b32cc814 --- /dev/null +++ b/radar-commons-testing/src/test/java/org/radarbase/mock/RecordGeneratorTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.radarbase.mock + +import org.hamcrest.MatcherAssert.assertThat +import org.hamcrest.Matchers.greaterThan +import org.hamcrest.Matchers.lessThanOrEqualTo +import org.junit.jupiter.api.Assertions.assertArrayEquals +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.radarbase.mock.config.MockDataConfig +import org.radarbase.mock.data.RecordGenerator +import org.radarcns.kafka.ObservationKey +import org.radarcns.passive.empatica.EmpaticaE4Acceleration + +/** + * Created by joris on 17/05/2017. + */ +class RecordGeneratorTest { + @Test + fun generate() { + val config = MockDataConfig() + config.topic = "test" + config.frequency = 10 + config.minimum = 0.1 + config.maximum = 9.9 + config.valueFields = mutableListOf("x", "y", "z") + config.valueSchema = EmpaticaE4Acceleration::class.java.getName() + val generator = RecordGenerator( + config, + ObservationKey::class.java, + ) + val iter = generator + .iterateValues(ObservationKey("test", "a", "b"), 0) + val record = iter.next() + assertEquals(ObservationKey("test", "a", "b"), record.key) + val x = (record.value as EmpaticaE4Acceleration).x + assertTrue(x >= 0.1f && x < 9.9f) + val y = (record.value as EmpaticaE4Acceleration).x + assertTrue(y >= 0.1f && y < 9.9f) + val z = (record.value as EmpaticaE4Acceleration).x + assertTrue(z >= 0.1f && z < 9.9f) + val time = (record.value as EmpaticaE4Acceleration).time + val now = System.currentTimeMillis() + assertThat(time, greaterThan(now / 1000.0 - 1.0)) + assertThat(time, lessThanOrEqualTo(now / 1000.0)) + val nextRecord = iter.next() + assertEquals(time + 0.1, (nextRecord.value[0] as Double), 1e-6) + } + + @Test + fun getHeaders() { + val config = MockDataConfig() + config.topic = "test" + config.valueSchema = EmpaticaE4Acceleration::class.java.getName() + val generator = RecordGenerator( + config, + ObservationKey::class.java, + ) + assertArrayEquals( + arrayOf("key.projectId", "key.userId", "key.sourceId", "value.time", "value.timeReceived", "value.x", "value.y", "value.z"), + generator.headerArray, + ) + } +} diff --git a/radar-commons-testing/src/test/java/org/radarbase/mock/data/MockRecordValidatorTest.kt b/radar-commons-testing/src/test/java/org/radarbase/mock/data/MockRecordValidatorTest.kt index ce7b1cfa..ec1d68d8 100644 --- a/radar-commons-testing/src/test/java/org/radarbase/mock/data/MockRecordValidatorTest.kt +++ b/radar-commons-testing/src/test/java/org/radarbase/mock/data/MockRecordValidatorTest.kt @@ -16,7 +16,6 @@ package org.radarbase.mock.data import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import org.junit.jupiter.api.BeforeEach @@ -33,10 +32,8 @@ import java.io.IOException import java.io.Writer import java.nio.file.Files import java.nio.file.Path -import java.util.* import kotlin.io.path.bufferedWriter -@OptIn(ExperimentalCoroutinesApi::class) class MockRecordValidatorTest { @TempDir lateinit var folder: Path diff --git a/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java b/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java index ac66ea1b..60960445 100644 --- a/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java +++ b/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.NoSuchElementException; import org.junit.jupiter.api.Test; public class MetronomeTest { @@ -42,7 +43,7 @@ public void timestamps() { check(it, base - 500L); check(it, base); assertThat(it.hasNext(), is(false)); - assertThrows(IllegalStateException.class, it::next); + assertThrows(NoSuchElementException.class, it::next); } @Test diff --git a/radar-commons-testing/src/test/java/org/radarbase/util/OscilloscopeTest.kt b/radar-commons-testing/src/test/java/org/radarbase/util/OscilloscopeTest.kt index 5fe6ba08..0945237b 100644 --- a/radar-commons-testing/src/test/java/org/radarbase/util/OscilloscopeTest.kt +++ b/radar-commons-testing/src/test/java/org/radarbase/util/OscilloscopeTest.kt @@ -17,7 +17,10 @@ package org.radarbase.util import kotlinx.coroutines.runBlocking import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.Matchers.* +import org.hamcrest.Matchers.greaterThanOrEqualTo +import org.hamcrest.Matchers.`is` +import org.hamcrest.Matchers.lessThan +import org.hamcrest.Matchers.lessThanOrEqualTo import org.junit.jupiter.api.Test class OscilloscopeTest { diff --git a/radar-commons/build.gradle.kts b/radar-commons/build.gradle.kts index 02d3db94..da61630e 100644 --- a/radar-commons/build.gradle.kts +++ b/radar-commons/build.gradle.kts @@ -1,5 +1,6 @@ plugins { kotlin("plugin.serialization") + kotlin("plugin.allopen") } description = "RADAR Common utilities library." @@ -15,7 +16,9 @@ repositories { // In this section you declare the dependencies for your production and test code dependencies { - api("org.apache.avro:avro:${Versions.avro}") + api("org.apache.avro:avro:${Versions.avro}") { + implementation("org.apache.commons:commons-compress:${Versions.commonsCompress}") + } api(kotlin("reflect")) implementation(project(":radar-commons-kotlin")) @@ -36,3 +39,7 @@ dependencies { testImplementation("org.mockito.kotlin:mockito-kotlin:${Versions.mockitoKotlin}") testImplementation("com.squareup.okhttp3:mockwebserver:${Versions.okhttp}") } + +allOpen { + annotation("org.radarbase.config.OpenConfig") +} diff --git a/radar-commons/src/main/java/org/radarbase/config/OpenConfig.kt b/radar-commons/src/main/java/org/radarbase/config/OpenConfig.kt new file mode 100644 index 00000000..0bf1fb82 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/config/OpenConfig.kt @@ -0,0 +1,4 @@ +package org.radarbase.config + +/** Annotation for configuration classes, so they can be extended. */ +annotation class OpenConfig diff --git a/radar-commons/src/main/java/org/radarbase/config/ServerConfig.kt b/radar-commons/src/main/java/org/radarbase/config/ServerConfig.kt index feac3e54..70e51620 100644 --- a/radar-commons/src/main/java/org/radarbase/config/ServerConfig.kt +++ b/radar-commons/src/main/java/org/radarbase/config/ServerConfig.kt @@ -19,11 +19,12 @@ import java.net.InetSocketAddress import java.net.MalformedURLException import java.net.Proxy import java.net.URL -import java.util.* +import java.util.Objects /** * POJO representing a ServerConfig configuration. */ +@OpenConfig class ServerConfig { /** Server host name or IP address. */ /** Set server host name or IP address. */ diff --git a/radar-commons/src/main/java/org/radarbase/producer/KafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/KafkaSender.kt index 8b816c11..e8337da8 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/KafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/KafkaSender.kt @@ -15,10 +15,6 @@ */ package org.radarbase.producer -import io.ktor.client.* -import io.ktor.client.engine.* -import io.ktor.client.engine.cio.* -import io.ktor.client.plugins.* import kotlinx.coroutines.flow.Flow import org.apache.avro.SchemaValidationException import org.radarbase.producer.rest.ConnectionState diff --git a/radar-commons/src/main/java/org/radarbase/producer/avro/AvroDataMapperFactory.kt b/radar-commons/src/main/java/org/radarbase/producer/avro/AvroDataMapperFactory.kt index 3b839860..ef68d6cd 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/avro/AvroDataMapperFactory.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/avro/AvroDataMapperFactory.kt @@ -3,12 +3,16 @@ package org.radarbase.producer.avro import org.apache.avro.JsonProperties import org.apache.avro.Schema import org.apache.avro.SchemaValidationException -import org.apache.avro.generic.* +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericEnumSymbol +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.avro.generic.IndexedRecord import org.radarbase.util.Base64Encoder import org.slf4j.Logger import org.slf4j.LoggerFactory import java.nio.ByteBuffer -import java.util.* +import java.util.EnumSet object AvroDataMapperFactory { /** diff --git a/radar-commons/src/main/java/org/radarbase/producer/io/DirectBinaryEncoder.kt b/radar-commons/src/main/java/org/radarbase/producer/io/DirectBinaryEncoder.kt index 0b51276b..9107e7d4 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/io/DirectBinaryEncoder.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/io/DirectBinaryEncoder.kt @@ -17,34 +17,18 @@ */ package org.radarbase.producer.io -import io.ktor.utils.io.* +import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.close +import io.ktor.utils.io.writeByte import org.apache.avro.io.BinaryData import java.io.IOException -import java.util.* /** * An [Encoder] for Avro's binary encoding that does not buffer output. - * - * - * This encoder does not buffer writes, and as a result is slower than - * [BufferedBinaryEncoder]. However, it is lighter-weight and useful when - * the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is - * very short lived. - * - * - * To construct, use - * [EncoderFactory.directBinaryEncoder] - * - * * DirectBinaryEncoder is not thread-safe * * @see BinaryEncoder - * - * @see EncoderFactory - * * @see Encoder - * - * @see Decoder */ class DirectBinaryEncoder( var out: ByteWriteChannel, @@ -95,10 +79,10 @@ class DirectBinaryEncoder( if (`val` and 0x7FFFFFFFL.inv() == 0L) { var i = `val`.toInt() while (i and 0x7F.inv() != 0) { - out.writeByte((0x80 or i and 0xFF).toByte().toInt()) + out.writeByte((0x80 or i and 0xFF).toByte()) i = i ushr 7 } - out.writeByte(i.toByte().toInt()) + out.writeByte(i.toByte()) return } val len = BinaryData.encodeLong(n, buf, 0) diff --git a/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt b/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt index 2bf60d01..2b94d22d 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt @@ -1,9 +1,11 @@ package org.radarbase.producer.io -import io.ktor.http.content.* -import io.ktor.utils.io.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent +import io.ktor.utils.io.ByteWriteChannel class FunctionalWriteChannelContent( + override val contentType: ContentType, private val writeAction: suspend (ByteWriteChannel) -> Unit, ) : OutgoingContent.WriteChannelContent() { override suspend fun writeTo(channel: ByteWriteChannel) = writeAction(channel) diff --git a/radar-commons/src/main/java/org/radarbase/producer/io/GzipContentEncoding.kt b/radar-commons/src/main/java/org/radarbase/producer/io/GzipContentEncoding.kt index 3df18ccb..6d04d120 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/io/GzipContentEncoding.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/io/GzipContentEncoding.kt @@ -1,13 +1,21 @@ package org.radarbase.producer.io -import io.ktor.client.* -import io.ktor.client.plugins.* -import io.ktor.client.request.* -import io.ktor.http.* -import io.ktor.http.content.* -import io.ktor.util.* -import io.ktor.util.cio.* -import io.ktor.utils.io.* +import io.ktor.client.HttpClient +import io.ktor.client.plugins.HttpClientPlugin +import io.ktor.client.request.HttpRequestPipeline +import io.ktor.http.ContentType +import io.ktor.http.Headers +import io.ktor.http.HeadersBuilder +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpMethod +import io.ktor.http.content.OutgoingContent +import io.ktor.http.contentLength +import io.ktor.util.AttributeKey +import io.ktor.util.KtorDsl +import io.ktor.util.cio.use +import io.ktor.util.deflated +import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.ByteWriteChannel import kotlinx.coroutines.coroutineScope /** @@ -30,9 +38,9 @@ class GzipContentEncoding private constructor() { return when (content) { is OutgoingContent.ProtocolUpgrade, is OutgoingContent.NoContent -> content - is OutgoingContent.ReadChannelContent -> GzipReadChannel(content.readFrom()) - is OutgoingContent.ByteArrayContent -> GzipReadChannel(ByteReadChannel(content.bytes())) - is OutgoingContent.WriteChannelContent -> GzipWriteChannel(content) + is OutgoingContent.ReadChannelContent -> GzipReadChannel(content.readFrom(), content.contentType) + is OutgoingContent.ByteArrayContent -> GzipReadChannel(ByteReadChannel(content.bytes()), content.contentType) + is OutgoingContent.WriteChannelContent -> GzipWriteChannel(content, content.contentType) } } @@ -74,6 +82,7 @@ class GzipContentEncoding private constructor() { private class GzipReadChannel( private val original: ByteReadChannel, + override val contentType: ContentType?, ) : OutgoingContent.ReadChannelContent() { override fun readFrom(): ByteReadChannel = original.deflated(gzip = true) @@ -81,6 +90,7 @@ class GzipContentEncoding private constructor() { private class GzipWriteChannel( private val content: WriteChannelContent, + override val contentType: ContentType?, ) : OutgoingContent.WriteChannelContent() { override suspend fun writeTo(channel: ByteWriteChannel) { coroutineScope { diff --git a/radar-commons/src/main/java/org/radarbase/producer/io/HttpClientExtensions.kt b/radar-commons/src/main/java/org/radarbase/producer/io/HttpClientExtensions.kt index 99e1ece4..bb8412e8 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/io/HttpClientExtensions.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/io/HttpClientExtensions.kt @@ -1,8 +1,8 @@ package org.radarbase.producer.io -import io.ktor.client.* -import io.ktor.client.engine.cio.* -import io.ktor.client.plugins.* +import io.ktor.client.HttpClientConfig +import io.ktor.client.engine.cio.CIOEngineConfig +import io.ktor.client.plugins.HttpTimeout import java.security.cert.X509Certificate import javax.net.ssl.X509TrustManager import kotlin.time.Duration diff --git a/radar-commons/src/main/java/org/radarbase/producer/io/UnsupportedMediaTypeException.kt b/radar-commons/src/main/java/org/radarbase/producer/io/UnsupportedMediaTypeException.kt index 34de28a1..69a24a1a 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/io/UnsupportedMediaTypeException.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/io/UnsupportedMediaTypeException.kt @@ -1,6 +1,6 @@ package org.radarbase.producer.io -import io.ktor.http.* +import io.ktor.http.ContentType import java.io.IOException class UnsupportedMediaTypeException( diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt index b8634e33..9b3a184f 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt @@ -1,11 +1,11 @@ package org.radarbase.producer.rest -import io.ktor.http.* -import io.ktor.http.content.* -import io.ktor.serialization.* -import io.ktor.util.reflect.* -import io.ktor.utils.io.* -import io.ktor.utils.io.charsets.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent +import io.ktor.serialization.ContentConverter +import io.ktor.util.reflect.TypeInfo +import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.charsets.Charset import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import org.radarbase.data.RecordData @@ -51,7 +51,7 @@ class AvroContentConverter( valueSchemaMetadata = valueSchema.await(), ) } - maker.createContent() + maker.createContent(contentType) } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt index 5bcea266..2fc734e6 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt @@ -1,7 +1,8 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent interface AvroRecordContent { - fun createContent(): OutgoingContent + fun createContent(contentType: ContentType): OutgoingContent } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt index 42785c50..5fa62de9 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt @@ -1,6 +1,7 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent import org.radarbase.data.RecordData import org.radarbase.data.RemoteSchemaEncoder import org.radarbase.producer.avro.AvroDataMapperFactory @@ -35,11 +36,12 @@ class BinaryRecordContent( "missing key schema version" } - override fun createContent(): OutgoingContent = FunctionalWriteChannelContent { channel -> - DirectBinaryEncoder(channel).use { - it.writeRecords() + override fun createContent(contentType: ContentType): OutgoingContent = + FunctionalWriteChannelContent(contentType) { channel -> + DirectBinaryEncoder(channel).use { + it.writeRecords() + } } - } private suspend fun BinaryEncoder.writeRecords() { startItem() diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/ConnectionState.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/ConnectionState.kt index 78440880..6dd44e6c 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/ConnectionState.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/ConnectionState.kt @@ -15,8 +15,19 @@ */ package org.radarbase.producer.rest -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.flow.transformLatest +import kotlinx.coroutines.plus import kotlin.coroutines.EmptyCoroutineContext import kotlin.time.Duration @@ -47,7 +58,10 @@ class ConnectionState( val scope = scope + Job() - private val mutableState = MutableStateFlow(State.UNKNOWN) + private val mutableState = MutableSharedFlow( + extraBufferCapacity = 1, + onBufferOverflow = DROP_OLDEST, + ) @OptIn(ExperimentalCoroutinesApi::class) val state: Flow = mutableState @@ -58,27 +72,28 @@ class ConnectionState( emit(State.UNKNOWN) } } + .distinctUntilChanged() .shareIn(this.scope + Dispatchers.Unconfined, SharingStarted.Eagerly, replay = 1) init { - mutableState.value = State.UNKNOWN + mutableState.tryEmit(State.UNKNOWN) } /** For a sender to indicate that a connection attempt succeeded. */ - fun didConnect() { - mutableState.value = State.CONNECTED + suspend fun didConnect() { + mutableState.emit(State.CONNECTED) } /** For a sender to indicate that a connection attempt failed. */ - fun didDisconnect() { - mutableState.value = State.DISCONNECTED + suspend fun didDisconnect() { + mutableState.emit(State.DISCONNECTED) } - fun wasUnauthorized() { - mutableState.value = State.UNAUTHORIZED + suspend fun wasUnauthorized() { + mutableState.emit(State.UNAUTHORIZED) } - fun reset() { - mutableState.value = State.UNKNOWN + suspend fun reset() { + mutableState.emit(State.UNKNOWN) } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt index 7fa481b9..2415abf3 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt @@ -1,12 +1,14 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* -import io.ktor.utils.io.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent +import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.writeByte +import io.ktor.utils.io.writeFully import org.radarbase.data.RecordData import org.radarbase.data.RemoteSchemaEncoder import org.radarbase.producer.io.FunctionalWriteChannelContent import org.radarbase.producer.schema.ParsedSchemaMetadata -import org.slf4j.LoggerFactory class JsonRecordContent( private val records: RecordData, @@ -26,8 +28,8 @@ class JsonRecordContent( readerSchema = valueSchemaMetadata.schema, ) - override fun createContent(): OutgoingContent = - FunctionalWriteChannelContent { it.writeRecords() } + override fun createContent(contentType: ContentType): OutgoingContent = + FunctionalWriteChannelContent(contentType) { it.writeRecords() } private suspend fun ByteWriteChannel.writeRecords() { writeByte('{'.code) @@ -60,7 +62,5 @@ class JsonRecordContent( val KEY = "{\"key\":".toByteArray() val VALUE = ",\"value\":".toByteArray() val END = "]}".toByteArray() - - private val logger = LoggerFactory.getLogger(JsonRecordContent::class.java) } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestException.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestException.kt index 0cc307ca..84160fff 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestException.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestException.kt @@ -15,7 +15,11 @@ */ package org.radarbase.producer.rest -import io.ktor.http.* +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.bodyAsText +import io.ktor.client.statement.request +import io.ktor.http.HttpStatusCode +import io.ktor.http.Url import java.io.IOException /** @@ -25,17 +29,35 @@ import java.io.IOException */ class RestException( val status: HttpStatusCode, + url: Url? = null, body: String? = null, cause: Throwable? = null, ) : IOException( buildString(150) { - append("REST call failed (HTTP code ") + append("REST call ") + if (url != null) { + append("to <") + append(url) + append("> ") + } + append("failed (HTTP code ") append(status) if (body == null) { append(')') } else { - append(body.substring(0, body.length.coerceAtMost(512))) + append("): ") + append( + if (body.length <= 512) { + body + } else { + body.substring(0, 512) + }, + ) } }, cause, -) +) { + companion object { + suspend fun HttpResponse.toRestException() = RestException(status, request.url, bodyAsText()) + } +} diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt index 13fd50b4..f5a6974d 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt @@ -15,18 +15,36 @@ */ package org.radarbase.producer.rest -import io.ktor.client.* -import io.ktor.client.call.* -import io.ktor.client.engine.cio.* -import io.ktor.client.plugins.* -import io.ktor.client.plugins.contentnegotiation.* -import io.ktor.client.request.* -import io.ktor.client.statement.* -import io.ktor.http.* -import io.ktor.util.reflect.* -import kotlinx.coroutines.* +import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.plugins.defaultRequest +import io.ktor.client.request.accept +import io.ktor.client.request.head +import io.ktor.client.request.headers +import io.ktor.client.request.post +import io.ktor.client.request.setBody +import io.ktor.client.request.url +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.bodyAsText +import io.ktor.client.statement.request +import io.ktor.http.ContentType +import io.ktor.http.Headers +import io.ktor.http.HeadersBuilder +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpStatusCode +import io.ktor.http.contentType +import io.ktor.http.isSuccess +import io.ktor.serialization.kotlinx.serialization +import io.ktor.util.reflect.TypeInfo +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.first +import kotlinx.coroutines.withContext +import kotlinx.serialization.json.Json import org.apache.avro.SchemaValidationException import org.radarbase.data.RecordData import org.radarbase.producer.AuthenticationException @@ -36,12 +54,12 @@ import org.radarbase.producer.io.GzipContentEncoding import org.radarbase.producer.io.UnsupportedMediaTypeException import org.radarbase.producer.io.timeout import org.radarbase.producer.io.unsafeSsl +import org.radarbase.producer.rest.RestException.Companion.toRestException import org.radarbase.producer.schema.SchemaRetriever import org.radarbase.topic.AvroTopic import org.radarbase.util.RadarProducerDsl import org.slf4j.LoggerFactory import java.io.IOException -import java.util.* import kotlin.reflect.javaType import kotlin.reflect.typeOf import kotlin.time.Duration @@ -69,7 +87,7 @@ class RestKafkaSender(config: Config) : KafkaSender { override val connectionState: Flow get() = _connectionState.state - private val baseUrl: String = requireNotNull(config.baseUrl) + private val baseUrl: String = requireNotNull(config.baseUrl).trimEnd('/') private val headers: Headers = config.headers.build() private val connectionTimeout: Duration = config.connectionTimeout private val contentEncoding = config.contentEncoding @@ -89,14 +107,20 @@ class RestKafkaSender(config: Config) : KafkaSender { private fun HttpClientConfig<*>.configure() { timeout(connectionTimeout) install(ContentNegotiation) { - this.register( + register( KAFKA_REST_BINARY_ENCODING, AvroContentConverter(schemaRetriever, binary = true), ) - this.register( + register( KAFKA_REST_JSON_ENCODING, AvroContentConverter(schemaRetriever, binary = false), ) + serialization( + KAFKA_REST_ACCEPT, + Json { + ignoreUnknownKeys = true + }, + ) } when (contentEncoding) { GZIP_CONTENT_ENCODING -> install(GzipContentEncoding) @@ -106,7 +130,7 @@ class RestKafkaSender(config: Config) : KafkaSender { unsafeSsl() } defaultRequest { - url(baseUrl) + url("$baseUrl/") contentType(contentType) accept(ContentType.Application.Json) headers { @@ -118,14 +142,11 @@ class RestKafkaSender(config: Config) : KafkaSender { inner class RestKafkaTopicSender( override val topic: AvroTopic, ) : KafkaTopicSender { - @OptIn(ExperimentalStdlibApi::class) - override suspend fun send(records: RecordData) = scope.async { + override suspend fun send(records: RecordData) = withContext(scope.coroutineContext) { try { val response: HttpResponse = restClient.post { url("topics/${topic.name}") - val kType = typeOf>() - val reifiedType = kType.javaType - setBody(records, TypeInfo(RecordData::class, reifiedType, kType)) + setBody(records, recordDataTypeInfo) } if (response.status.isSuccess()) { _connectionState.didConnect() @@ -135,18 +156,18 @@ class RestKafkaSender(config: Config) : KafkaSender { throw AuthenticationException("Request unauthorized") } else if (response.status == HttpStatusCode.UnsupportedMediaType) { throw UnsupportedMediaTypeException( - response.request.contentType(), + response.request.contentType() ?: response.request.content.contentType, response.request.headers[HttpHeaders.ContentEncoding], ) } else { _connectionState.didDisconnect() - throw RestException(response.status, response.bodyAsText()) + throw response.toRestException() } } catch (ex: IOException) { _connectionState.didDisconnect() throw ex } - }.await() + } } @Throws(SchemaValidationException::class) @@ -253,11 +274,22 @@ class RestKafkaSender(config: Config) : KafkaSender { companion object { private val logger = LoggerFactory.getLogger(RestKafkaSender::class.java) + private val recordDataTypeInfo: TypeInfo + val DEFAULT_TIMEOUT: Duration = 20.seconds val KAFKA_REST_BINARY_ENCODING = ContentType("application", "vnd.radarbase.avro.v1+binary") - val KAFKA_REST_JSON_ENCODING = ContentType("application", "vnd.kafka+json") + val KAFKA_REST_JSON_ENCODING = ContentType("application", "vnd.kafka.avro.v2+json") + val KAFKA_REST_ACCEPT = ContentType("application", "vnd.kafka.v2+json") const val GZIP_CONTENT_ENCODING = "gzip" + init { + val kType = typeOf>() + + @OptIn(ExperimentalStdlibApi::class) + val reifiedType = kType.javaType + recordDataTypeInfo = TypeInfo(RecordData::class, reifiedType, kType) + } + fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender = RestKafkaSender(Config().apply(builder)) } diff --git a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt index 50746c2b..59fa1d11 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt @@ -1,19 +1,26 @@ package org.radarbase.producer.schema -import io.ktor.client.* -import io.ktor.client.call.* -import io.ktor.client.plugins.* -import io.ktor.client.plugins.contentnegotiation.* -import io.ktor.client.request.* -import io.ktor.client.statement.* -import io.ktor.http.* -import io.ktor.serialization.kotlinx.json.* -import io.ktor.util.reflect.* +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.plugins.defaultRequest +import io.ktor.client.request.HttpRequestBuilder +import io.ktor.client.request.accept +import io.ktor.client.request.request +import io.ktor.client.request.setBody +import io.ktor.client.request.url +import io.ktor.http.ContentType +import io.ktor.http.HttpMethod +import io.ktor.http.contentType +import io.ktor.http.isSuccess +import io.ktor.serialization.kotlinx.json.json +import io.ktor.util.reflect.TypeInfo +import io.ktor.util.reflect.typeInfo import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import kotlinx.serialization.json.Json import org.apache.avro.Schema -import org.radarbase.producer.rest.RestException +import org.radarbase.producer.rest.RestException.Companion.toRestException import java.io.IOException import kotlin.coroutines.CoroutineContext @@ -50,7 +57,7 @@ class SchemaRestClient( requestBuilder() } if (!response.status.isSuccess()) { - throw RestException(response.status, response.bodyAsText()) + throw response.toRestException() } response.body(typeInfo) } @@ -62,7 +69,7 @@ class SchemaRestClient( requestBuilder() } if (!response.status.isSuccess()) { - throw RestException(response.status, response.bodyAsText()) + throw response.toRestException() } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt index b3c0de8d..29d792b2 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRetriever.kt @@ -15,11 +15,9 @@ */ package org.radarbase.producer.schema -import io.ktor.client.* -import io.ktor.client.engine.cio.* -import io.ktor.client.plugins.* -import io.ktor.client.request.* -import io.ktor.http.* +import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig +import io.ktor.client.engine.cio.CIO import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -29,7 +27,6 @@ import org.radarbase.kotlin.coroutines.CachedValue import org.radarbase.util.RadarProducerDsl import java.io.IOException import java.lang.ref.SoftReference -import java.util.* import java.util.Objects.hash import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap @@ -64,10 +61,10 @@ open class SchemaRetriever(config: Config) { val subject = subject(topic, ofValue) val metadata = restClient.addSchema(subject, schema) - launch { - cachedMetadata(subject, metadata.schema).set(metadata) - } if (metadata.version != null) { + launch { + cachedMetadata(subject, metadata.schema).set(metadata) + } launch { cachedVersion(subject, metadata.version).set(metadata) } diff --git a/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.kt b/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.kt index b678fc8e..86148f90 100644 --- a/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.kt +++ b/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.kt @@ -18,7 +18,6 @@ package org.radarbase.topic import org.apache.avro.Schema import org.apache.avro.specific.SpecificData import org.apache.avro.specific.SpecificRecord -import java.util.* /** * Kafka topic with Avro schema. diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/BinaryRecordContentTest.kt b/radar-commons/src/test/java/org/radarbase/producer/rest/BinaryRecordContentTest.kt index f932a3cc..dc13b515 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/BinaryRecordContentTest.kt +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/BinaryRecordContentTest.kt @@ -1,9 +1,9 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* -import io.ktor.util.* -import io.ktor.utils.io.* -import kotlinx.coroutines.ExperimentalCoroutinesApi +import io.ktor.http.content.OutgoingContent +import io.ktor.util.toByteArray +import io.ktor.utils.io.ByteChannel +import io.ktor.utils.io.close import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import org.apache.avro.SchemaValidationException @@ -26,7 +26,6 @@ import java.io.InputStreamReader import java.nio.ByteBuffer import java.util.zip.GZIPOutputStream -@OptIn(ExperimentalCoroutinesApi::class) class BinaryRecordContentTest { @Test @Throws(SchemaValidationException::class, IOException::class) @@ -52,7 +51,7 @@ class BinaryRecordContentTest { val channel = ByteChannel() launch { - val content = request.createContent() as OutgoingContent.WriteChannelContent + val content = request.createContent(RestKafkaSender.KAFKA_REST_BINARY_ENCODING) as OutgoingContent.WriteChannelContent content.writeTo(channel) channel.close() } diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt b/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt index 9fc2890d..2194ba9a 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt @@ -19,8 +19,7 @@ import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.JsonNodeType -import io.ktor.util.* -import kotlinx.coroutines.ExperimentalCoroutinesApi +import io.ktor.util.moveToByteArray import kotlinx.coroutines.test.runTest import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer @@ -28,10 +27,18 @@ import org.apache.avro.SchemaValidationException import org.apache.avro.io.DecoderFactory import org.apache.avro.specific.SpecificDatumReader import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.mockito.kotlin.* +import org.mockito.Mockito.mock +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.stub +import org.mockito.kotlin.times +import org.mockito.kotlin.verify import org.radarbase.data.AvroRecordData import org.radarbase.producer.AuthenticationException import org.radarbase.producer.rest.RestKafkaSender.Companion.restKafkaSender @@ -44,10 +51,8 @@ import org.radarcns.passive.phone.PhoneLight import org.slf4j.LoggerFactory import java.io.IOException import java.nio.charset.StandardCharsets -import java.util.* import java.util.zip.GZIPInputStream -@OptIn(ExperimentalCoroutinesApi::class) class RestKafkaSenderTest { private lateinit var retriever: SchemaRetriever private lateinit var sender: RestKafkaSender diff --git a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientTest.kt b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientTest.kt index c58ed1fe..fa11b132 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientTest.kt +++ b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRestClientTest.kt @@ -15,10 +15,8 @@ */ package org.radarbase.producer.schema -import io.ktor.client.* -import io.ktor.client.engine.cio.* -import io.ktor.client.plugins.* -import kotlinx.coroutines.ExperimentalCoroutinesApi +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO import kotlinx.coroutines.test.runTest import okhttp3.mockwebserver.MockWebServer import org.apache.avro.Schema @@ -29,10 +27,8 @@ import org.junit.jupiter.api.Test import org.radarbase.producer.io.timeout import org.radarbase.producer.rest.RestKafkaSenderTest.Companion.enqueueJson import java.io.IOException -import java.util.* import kotlin.time.Duration.Companion.seconds -@OptIn(ExperimentalCoroutinesApi::class) class SchemaRestClientTest { private lateinit var mockServer: MockWebServer private lateinit var retriever: SchemaRestClient diff --git a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverTest.kt b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverTest.kt index 962e9556..8b77de94 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverTest.kt +++ b/radar-commons/src/test/java/org/radarbase/producer/schema/SchemaRetrieverTest.kt @@ -15,8 +15,7 @@ */ package org.radarbase.producer.schema -import io.ktor.client.plugins.* -import kotlinx.coroutines.ExperimentalCoroutinesApi +import io.ktor.client.plugins.defaultRequest import kotlinx.coroutines.test.runTest import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer @@ -33,7 +32,6 @@ import org.radarbase.producer.schema.SchemaRetriever.Companion.subject import java.io.IOException import kotlin.time.Duration.Companion.seconds -@OptIn(ExperimentalCoroutinesApi::class) class SchemaRetrieverTest { private lateinit var mockServer: MockWebServer private lateinit var retriever: SchemaRetriever diff --git a/radar-commons/src/test/java/org/radarbase/topic/KafkaTopicTest.kt b/radar-commons/src/test/java/org/radarbase/topic/KafkaTopicTest.kt index 26cbefde..fdcfc4e0 100644 --- a/radar-commons/src/test/java/org/radarbase/topic/KafkaTopicTest.kt +++ b/radar-commons/src/test/java/org/radarbase/topic/KafkaTopicTest.kt @@ -3,7 +3,7 @@ package org.radarbase.topic import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import java.util.* +import java.util.UUID class KafkaTopicTest { @Test @@ -23,17 +23,19 @@ class KafkaTopicTest { @Throws(Exception::class) fun compare() { val randomSize = 100 - val randomString: MutableList = ArrayList(randomSize) - val randomTopic: MutableList = ArrayList(randomSize) - for (i in 0 until randomSize) { - val str = 'a'.toString() + UUID.randomUUID().toString().replace('-', '_') - randomString.add(str) - randomTopic.add(KafkaTopic(str)) - } + val randomString: MutableList = (0 until randomSize) + .mapTo(ArrayList(randomSize)) { 'a'.toString() + UUID.randomUUID().toString().replace('-', '_') } + val randomTopic: MutableList = randomString + .mapTo(ArrayList(randomSize)) { KafkaTopic(it) } + + randomString.shuffle() + randomTopic.shuffle() randomString.sort() randomTopic.sort() - for (i in 0 until randomSize) { - assertEquals(randomString[i], randomTopic[i].name) - } + + randomString.zip(randomTopic) + .forEach { (s, t) -> + assertEquals(s, t.name) + } } } diff --git a/radar-commons/src/test/java/org/radarbase/util/Base64Test.kt b/radar-commons/src/test/java/org/radarbase/util/Base64Test.kt index 66fd0bbc..d24decd6 100644 --- a/radar-commons/src/test/java/org/radarbase/util/Base64Test.kt +++ b/radar-commons/src/test/java/org/radarbase/util/Base64Test.kt @@ -2,7 +2,7 @@ package org.radarbase.util import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import java.util.* +import java.util.Base64 import java.util.concurrent.ThreadLocalRandom import kotlin.text.Charsets.UTF_8