Skip to content

Commit

Permalink
feat: CogRPC Client (#43)
Browse files Browse the repository at this point in the history
* CogRPC client

* CogRPC code review

* CogRPC Client error handling and testing

* CargoCollection further testing

* Don't skip other public gateways when cargo collection fails
  • Loading branch information
sdsantos authored May 8, 2020
1 parent 646da8e commit 09f0a28
Show file tree
Hide file tree
Showing 38 changed files with 975 additions and 342 deletions.
8 changes: 0 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,3 @@ Requires an Android device or emulator
```
./gradlew jacocoAndroidTestReport
```

## TODO

### Remove Netty Conscrypt fix

Once this Netty commit (https://github.com/netty/netty/commit/79ef0c4706b64bd0b6c3ce24516beb587a0c5f4a)
gets released, we can remove the overload class `io.netty.handler.ssl.Conscrypt` in this module,
and its netty-handler dependency on `build.gradle`.
27 changes: 10 additions & 17 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ android {
}
}
compileOptions {
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
coreLibraryDesugaringEnabled true
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
kotlinOptions {
jvmTarget = '1.8'
jvmTarget = JavaVersion.VERSION_1_8
}
packagingOptions {
exclude 'META-INF/*'
Expand All @@ -73,6 +74,9 @@ dependencies {
implementation project(':relaynet-jvm') // TODO: remove when real version is finished
implementation project(':relaynet-cogrpc-jvm')

// Java 8
coreLibraryDesugaring 'com.android.tools:desugar_jdk_libs:1.0.5'

// Kotlin
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlinVersion"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion"
Expand All @@ -89,7 +93,7 @@ dependencies {
kapt 'com.google.dagger:dagger-compiler:2.27'

// Relaynet
implementation 'tech.relaycorp:relaynet:1.11.2'
implementation 'tech.relaycorp:relaynet:1.12.4'
implementation 'tech.relaycorp:relaynet-cogrpc:1.0.1'

// ORM
Expand All @@ -103,23 +107,12 @@ dependencies {
implementation 'com.github.tfcporciuncula:flow-preferences:1.1.0'

// gRPC
implementation("io.grpc:grpc-netty:${grpcVersion}") {
// TODO: remove exclude when there's a new netty release
exclude module: 'netty-handler'
}
implementation "io.grpc:grpc-netty:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"

// Android TLS support for Netty
// TODO: remove netty-handler when there's a new netty release
implementation('com.github.netty.netty:netty-handler:79ef0c4706') {
exclude module: 'netty-common'
exclude module: 'netty-resolver'
exclude module: 'netty-buffer'
exclude module: 'netty-transport'
exclude module: 'netty-codec'
}
// implementation 'io.netty:netty-handler:4.1.48.Final'
implementation "io.netty:netty-handler:$nettyVersion"
implementation 'org.conscrypt:conscrypt-android:2.4.0'

// Base64 encoding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import org.apache.commons.codec.binary.Base64

internal object Authorization {
internal val metadataKey =
Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER)
import tech.relaycorp.relaynet.cogrpc.AuthorizationMetadata

internal object AuthorizationContext {
// Context values are bound to the current thread
internal val contextKey = Context.key<String>("Authorization")
internal val contextKey = Context.key<ByteArray>("Authorization")

internal val interceptor by lazy {
object : ServerInterceptor {
Expand All @@ -21,7 +18,7 @@ internal object Authorization {
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
val auth = headers[metadataKey]
val auth = AuthorizationMetadata.getCCASerialized(headers)
val context = Context.current().withValue(contextKey, auth)
val previousContext = context.attach()
return try {
Expand All @@ -33,13 +30,5 @@ internal object Authorization {
}
}

internal fun getCCA(): ByteArray? {
val auth = contextKey.get()
if (auth?.startsWith(AUTHORIZATION_TYPE) != true) return null

val ccaBase64 = auth.substring(AUTHORIZATION_TYPE.length)
return Base64().decode(ccaBase64.toByteArray())
}

internal const val AUTHORIZATION_TYPE = "Relaynet-CCA "
internal fun getCCA(): ByteArray? = contextKey.get()
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package tech.relaycorp.cogrpc.server

import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import tech.relaycorp.courier.common.Logging.logger
import tech.relaycorp.relaynet.CargoDeliveryRequest
import tech.relaycorp.relaynet.cogrpc.CargoDelivery
import tech.relaycorp.relaynet.cogrpc.CargoDeliveryAck
import tech.relaycorp.relaynet.cogrpc.CargoRelayGrpc
import tech.relaycorp.relaynet.cogrpc.CogRPC
import tech.relaycorp.relaynet.cogrpc.toAck
import tech.relaycorp.relaynet.cogrpc.toCargoDelivery
import java.util.logging.Level

class CogRPCConnectionService(
Expand All @@ -20,10 +21,10 @@ class CogRPCConnectionService(
object : StreamObserver<CargoDelivery> {
override fun onNext(cargoDelivery: CargoDelivery) {
coroutineScope.launch {
logger.info("deliverCargo next")
val result = serverService.deliverCargo(cargoDelivery.toMessageReceived())
logger.info("deliverCargo next ${cargoDelivery.id}")
val result = serverService.deliverCargo(cargoDelivery.cargo.newInput())
if (result) {
logger.info("deliverCargo next ack")
logger.info("deliverCargo next ack ${cargoDelivery.id}")
responseObserver.onNext(cargoDelivery.toAck())
}
}
Expand All @@ -42,7 +43,7 @@ class CogRPCConnectionService(
}

override fun collectCargo(responseObserver: StreamObserver<CargoDelivery>): StreamObserver<CargoDeliveryAck> {
val cca = Authorization.getCCA()
val cca = AuthorizationContext.getCCA()
if (cca == null) {
logger.info("collectCargo completed due to missing CCA")
responseObserver.onCompleted()
Expand Down Expand Up @@ -71,10 +72,10 @@ class CogRPCConnectionService(

return object : StreamObserver<CargoDeliveryAck> {
override fun onNext(ack: CargoDeliveryAck) {
logger.info("collectCargo ack next")
logger.info("collectCargo ack next ${ack.id}")
coroutineScope.launch {
try {
serverService.processCargoCollectionAck(ack.toMessageDeliveryAck())
serverService.processCargoCollectionAck(ack.id)
deliveriesToAck.remove(ack.id)
if (deliveriesToAck.isEmpty()) {
logger.info("collectCargo completed")
Expand All @@ -91,30 +92,11 @@ class CogRPCConnectionService(
}

override fun onCompleted() {
logger.info("collectCargo ack complete")
logger.info("collectCargo ack closed")
}
}
}

private suspend fun getDeliveriesForCCA(cca: ByteArray): Iterable<CogRPC.MessageDelivery> {
val messageReceived = CogRPC.MessageReceived(cca.inputStream())
return serverService.collectCargo(messageReceived)
}

internal fun CargoDelivery.toMessageReceived() =
CogRPC.MessageReceived(data = cargo.newInput())

private fun CogRPC.MessageDelivery.toCargoDelivery() =
CargoDelivery.newBuilder()
.setId(localId)
.setCargo(ByteString.copyFrom(data.readBytes()))
.build()

internal fun CargoDelivery.toAck(): CargoDeliveryAck =
CargoDeliveryAck.newBuilder()
.setId(id)
.build()

internal fun CargoDeliveryAck.toMessageDeliveryAck() =
CogRPC.MessageDeliveryAck(id)
private suspend fun getDeliveriesForCCA(cca: ByteArray): Iterable<CargoDeliveryRequest> =
serverService.collectCargo(cca)
}
11 changes: 6 additions & 5 deletions app/src/main/java/tech/relaycorp/cogrpc/server/CogRPCServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.withContext
import org.conscrypt.Conscrypt
import tech.relaycorp.courier.common.Logging.logger
import tech.relaycorp.relaynet.cogrpc.CogRPC
import tech.relaycorp.relaynet.CargoDeliveryRequest
import java.io.IOException
import java.io.InputStream
import java.net.InetSocketAddress
import java.security.Security
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -54,7 +55,7 @@ internal constructor(
.maxConnectionIdle(MAX_CONNECTION_IDLE.inSeconds.roundToLong(), TimeUnit.SECONDS)
.useTransportSecurity(certificateInputStream, keyInputStream)
.addService(CogRPCConnectionService(coroutineScope, service))
.intercept(Authorization.interceptor)
.intercept(AuthorizationContext.interceptor)
.addTransportFilter(clientsInterceptor)
.build()

Expand Down Expand Up @@ -101,8 +102,8 @@ internal constructor(
}

interface Service {
suspend fun collectCargo(cca: CogRPC.MessageReceived): Iterable<CogRPC.MessageDelivery>
suspend fun processCargoCollectionAck(ack: CogRPC.MessageDeliveryAck)
suspend fun deliverCargo(cargo: CogRPC.MessageReceived): Boolean
suspend fun collectCargo(ccaSerialized: ByteArray): Iterable<CargoDeliveryRequest>
suspend fun processCargoCollectionAck(localId: String)
suspend fun deliverCargo(cargoSerialized: InputStream): Boolean
}
}
6 changes: 3 additions & 3 deletions app/src/main/java/tech/relaycorp/courier/data/DataModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import tech.relaycorp.courier.data.database.AppDatabase
import tech.relaycorp.relaynet.Cargo
import tech.relaycorp.relaynet.CargoCollectionAuthorization
import tech.relaycorp.relaynet.cogrpc.client.CogRPCClient
import tech.relaycorp.relaynet.cogrpc.client.MockCogRPCClient
import javax.inject.Named
import javax.inject.Singleton

Expand Down Expand Up @@ -55,13 +54,14 @@ class DataModule {
FlowSharedPreferences(sharedPreferences)

@Provides
fun cogRPCClientBuilder(): CogRPCClient.Builder = MockCogRPCClient.Builder
fun cogRPCClientBuilder(): CogRPCClient.Builder = CogRPCClient.Builder

@Provides
fun cogRPCServer() = CogRPCServer.Builder.build("0.0.0.0", 21473)

@Provides
fun cargoDeserializer(): ((@JvmSuppressWildcards ByteArray) -> Cargo) = Cargo.Companion::deserialize
fun cargoDeserializer(): ((@JvmSuppressWildcards ByteArray) -> Cargo) =
Cargo.Companion::deserialize

@Provides
fun ccaDeserializer(): ((@JvmSuppressWildcards ByteArray) -> CargoCollectionAuthorization) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package tech.relaycorp.courier.data.model

import androidx.room.ColumnInfo
import androidx.room.Entity
import tech.relaycorp.courier.domain.client.UniqueMessageId
import java.util.Date
import java.util.UUID

@Entity(
tableName = "Message",
Expand All @@ -24,6 +24,7 @@ data class StoredMessage(
val storagePath: String,
val size: StorageSize
) {

val uniqueMessageId get() = UniqueMessageId(senderAddress, messageId)
companion object {
fun generateLocalId() = UUID.randomUUID().toString()
}
}
6 changes: 4 additions & 2 deletions app/src/main/java/tech/relaycorp/courier/domain/PublicSync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package tech.relaycorp.courier.domain
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import tech.relaycorp.courier.common.BehaviorChannel
import tech.relaycorp.courier.common.Logging.logger
import tech.relaycorp.courier.domain.client.CargoCollection
import tech.relaycorp.courier.domain.client.CargoDelivery
import tech.relaycorp.relaynet.cogrpc.client.CogRPCClient
import java.util.logging.Level
import javax.inject.Inject
import kotlin.time.seconds

Expand All @@ -21,7 +22,8 @@ class PublicSync
suspend fun sync() {
try {
syncUnhandled()
} catch (e: CogRPCClient.Exception) {
} catch (e: Exception) {
logger.log(Level.WARNING, "Public sync error", e)
state.send(State.Error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ class StoreMessage
return storeMessage(MessageType.Cargo, cargo, cargoBytes)
}

suspend fun storeCCA(ccaInputStream: InputStream): StoredMessage? {
val ccaBytes = ccaInputStream.readBytes()
suspend fun storeCCA(ccaSerialized: ByteArray): StoredMessage? {
val cca = try {
ccaDeserializer.invoke(ccaBytes)
ccaDeserializer.invoke(ccaSerialized)
} catch (e: RAMFMessageMalformedException) {
logger.warning("Malformed CCA received")
return null
Expand All @@ -55,7 +54,7 @@ class StoreMessage
return null
}

return storeMessage(MessageType.CCA, cca, ccaBytes)
return storeMessage(MessageType.CCA, cca, ccaSerialized)
}

private suspend fun storeMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import tech.relaycorp.courier.data.model.MessageType
import tech.relaycorp.courier.data.model.StoredMessage
import tech.relaycorp.courier.domain.DeleteMessage
import tech.relaycorp.courier.domain.StoreMessage
import tech.relaycorp.relaynet.cogrpc.CogRPC
import tech.relaycorp.relaynet.cogrpc.client.CogRPCClient
import java.io.InputStream
import java.util.logging.Level
Expand All @@ -28,25 +27,34 @@ class CargoCollection
suspend fun collect() {
getCCAs()
.forEach { cca ->
collectAndStoreCargoForCCA(cca)
deleteCCA(cca)
try {
collectAndStoreCargoForCCA(cca)
deleteCCA(cca)
} catch (e: CogRPCClient.CogRPCException) {
logger.log(Level.WARNING, "Cargo collection error", e)
}
}
}

private suspend fun getCCAs() =
storedMessageDao.getByRecipientTypeAndMessageType(
MessageAddress.Type.Public,
MessageType.Cargo
MessageType.CCA
)

@Throws(CogRPCClient.CogRPCException::class)
private suspend fun collectAndStoreCargoForCCA(cca: StoredMessage) {
val client = clientBuilder.build(cca.recipientAddress.value)
try {
clientBuilder
.build(cca.recipientAddress.value)
.collectCargo(cca.toCogRPCMessage())
.collect { storeCargo(it.data) }
client
.collectCargo(cca.getSerializedInputStream())
.collect { storeCargo(it) }
} catch (e: MessageDataNotFoundException) {
logger.log(Level.WARNING, "CCA data could not found on disk", e)
} catch (e: CogRPCClient.CCARefusedException) {
logger.log(Level.WARNING, "CCA refused")
} finally {
client.close()
}
}

Expand All @@ -57,9 +65,6 @@ class CargoCollection
deleteMessage.delete(cca)

@Throws(MessageDataNotFoundException::class)
private suspend fun StoredMessage.toCogRPCMessage() =
CogRPC.MessageDelivery(
localId = uniqueMessageId.value,
data = diskRepository.readMessage(storagePath)
)
private suspend fun StoredMessage.getSerializedInputStream() =
diskRepository.readMessage(storagePath)
}
Loading

0 comments on commit 09f0a28

Please sign in to comment.