Skip to content

Commit

Permalink
refactor: replace word transaction to saga (#120)
Browse files Browse the repository at this point in the history
* refactor: replace word transaction to saga

* build: netx version 0.3.8 to 0.3.9
  • Loading branch information
devxb authored Mar 30, 2024
1 parent 3134d93 commit 9088a25
Show file tree
Hide file tree
Showing 81 changed files with 1,721 additions and 1,709 deletions.
214 changes: 108 additions & 106 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftop.netx
version=0.3.8
version=0.3.9
compatibility=17

### Sonarcloud ###
Expand Down
8 changes: 4 additions & 4 deletions src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ class EncodeException(message: String, throwable: Throwable) : RuntimeException(

class DecodeException(message: String, throwable: Throwable) : RuntimeException(message, throwable)

open class TransactionException(message: String) : RuntimeException(message)
open class SagaException(message: String) : RuntimeException(message)

class AlreadyCommittedTransactionException(transactionId: String, state: String) :
TransactionException("Cannot join transaction cause, transaction \"$transactionId\" already \"$state\"")
class AlreadyCommittedSagaException(id: String, state: String) :
SagaException("Cannot join saga cause, saga \"$id\" already \"$state\"")

class NotFoundDispatchFunctionException(message: String) : RuntimeException(message)

class FailedAckTransactionException(message: String) : RuntimeException(message)
class FailedAckSagaException(message: String) : RuntimeException(message)

class ResultTimeoutException(message: String, throwable: Throwable) :
RuntimeException(message, throwable)
Expand Down
16 changes: 8 additions & 8 deletions src/main/kotlin/org/rooftop/netx/api/Orchestrator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import reactor.core.publisher.Mono

interface Orchestrator<T : Any, V : Any> {

fun transaction(request: T): Mono<Result<V>>
fun saga(request: T): Mono<Result<V>>

fun transaction(timeoutMillis: Long, request: T): Mono<Result<V>>
fun saga(timeoutMillis: Long, request: T): Mono<Result<V>>

fun transaction(request: T, context: MutableMap<String, Any>): Mono<Result<V>>
fun saga(request: T, context: MutableMap<String, Any>): Mono<Result<V>>

fun transaction(timeoutMillis: Long, request: T, context: MutableMap<String, Any>): Mono<Result<V>>
fun saga(timeoutMillis: Long, request: T, context: MutableMap<String, Any>): Mono<Result<V>>

fun transactionSync(request: T): Result<V>
fun sagaSync(request: T): Result<V>

fun transactionSync(timeoutMillis: Long, request: T): Result<V>
fun sagaSync(timeoutMillis: Long, request: T): Result<V>

fun transactionSync(request: T, context: MutableMap<String, Any>): Result<V>
fun sagaSync(request: T, context: MutableMap<String, Any>): Result<V>

fun transactionSync(timeoutMillis: Long, request: T, context: MutableMap<String, Any>): Result<V>
fun sagaSync(timeoutMillis: Long, request: T, context: MutableMap<String, Any>): Result<V>
}
13 changes: 13 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SagaCommitEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.rooftop.netx.api

class SagaCommitEvent internal constructor(
id: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
) : SagaEvent(id, nodeName, group, event, codec) {

override fun copy(): SagaCommitEvent =
SagaCommitEvent(id, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionCommitListener(
annotation class SagaCommitListener(
val event: KClass<*> = Any::class,
val noRollbackFor: Array<KClass<out Throwable>> = [],
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package org.rooftop.netx.api

import kotlin.reflect.KClass

sealed class TransactionEvent(
val transactionId: String,
sealed class SagaEvent(
val id: String,
val nodeName: String,
val group: String,
internal val event: String?,
Expand All @@ -24,5 +24,5 @@ sealed class TransactionEvent(
type
)

internal abstract fun copy(): TransactionEvent
internal abstract fun copy(): SagaEvent
}
13 changes: 13 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SagaJoinEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.rooftop.netx.api

class SagaJoinEvent internal constructor(
id: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
) : SagaEvent(id, nodeName, group, event, codec) {

override fun copy(): SagaJoinEvent =
SagaJoinEvent(id, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionJoinListener(
annotation class SagaJoinListener(
val event: KClass<*> = Any::class,
val noRollbackFor: Array<KClass<out Throwable>> = [],
val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
Expand Down
43 changes: 43 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SagaManager.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.rooftop.netx.api

import reactor.core.publisher.Mono

interface SagaManager {

fun start(): Mono<String>

fun <T : Any> start(event: T): Mono<String>

fun syncStart(): String

fun <T : Any> syncStart(event: T): String

fun join(id: String): Mono<String>

fun <T : Any> join(id: String, event: T): Mono<String>

fun syncJoin(id: String): String

fun <T : Any> syncJoin(id: String, event: T): String

fun exists(id: String): Mono<String>

fun syncExists(id: String): String

fun commit(id: String): Mono<String>

fun <T : Any> commit(id: String, event: T): Mono<String>

fun syncCommit(id: String): String

fun <T : Any> syncCommit(id: String, event: T): String

fun rollback(id: String, cause: String): Mono<String>

fun <T : Any> rollback(id: String, cause: String, event: T): Mono<String>

fun syncRollback(id: String, cause: String): String

fun <T : Any> syncRollback(id: String, cause: String, event: T): String

}
14 changes: 14 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SagaRollbackEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.rooftop.netx.api

class SagaRollbackEvent internal constructor(
id: String,
nodeName: String,
group: String,
event: String?,
val cause: String,
codec: Codec,
) : SagaEvent(id, nodeName, group, event, codec) {

override fun copy(): SagaRollbackEvent =
SagaRollbackEvent(id, nodeName, group, event, cause, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionRollbackListener(
annotation class SagaRollbackListener(
val event: KClass<*> = Any::class,
)
13 changes: 13 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SagaStartEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.rooftop.netx.api

class SagaStartEvent internal constructor(
id: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
) : SagaEvent(id, nodeName, group, event, codec) {

override fun copy(): SagaStartEvent =
SagaStartEvent(id, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlin.reflect.KClass

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionStartListener(
annotation class SagaStartListener(
val event: KClass<*> = Any::class,
val noRollbackFor: Array<KClass<out Throwable>> = [],
val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
Expand Down
13 changes: 0 additions & 13 deletions src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt

This file was deleted.

13 changes: 0 additions & 13 deletions src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt

This file was deleted.

43 changes: 0 additions & 43 deletions src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt

This file was deleted.

14 changes: 0 additions & 14 deletions src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt

This file was deleted.

13 changes: 0 additions & 13 deletions src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt

This file was deleted.

40 changes: 20 additions & 20 deletions src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.TransactionEvent
import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.api.SagaEvent
import org.rooftop.netx.api.SagaManager
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
Expand All @@ -11,48 +11,48 @@ internal sealed class AbstractDispatchFunction<T>(
protected val function: KFunction<T>,
protected val handler: Any,
private val noRollbackFor: Array<KClass<out Throwable>>,
private val nextState: NextTransactionState,
private val transactionManager: TransactionManager,
private val nextState: NextSagaState,
private val sagaManager: SagaManager,
) {
fun name(): String = function.name

abstract fun call(transactionEvent: TransactionEvent): T
abstract fun call(sagaEvent: SagaEvent): T

protected fun isNoRollbackFor(throwable: Throwable): Boolean {
return noRollbackFor.isNotEmpty() && throwable.cause != null && noRollbackFor.contains(
throwable.cause!!::class
)
}

protected fun isProcessable(transactionEvent: TransactionEvent): Boolean {
protected fun isProcessable(sagaEvent: SagaEvent): Boolean {
return runCatching {
transactionEvent.decodeEvent(eventType)
sagaEvent.decodeEvent(eventType)
}.onFailure {
return it is NullPointerException && eventType == Any::class
}.isSuccess
}

protected fun rollback(transactionEvent: TransactionEvent, throwable: Throwable) {
transactionEvent.nextEvent?.let {
transactionManager.rollback(transactionEvent.transactionId, throwable.getCause(), it)
protected fun rollback(sagaEvent: SagaEvent, throwable: Throwable) {
sagaEvent.nextEvent?.let {
sagaManager.rollback(sagaEvent.id, throwable.getCause(), it)
.subscribeOn(Schedulers.parallel())
.subscribe()
} ?: transactionManager.rollback(transactionEvent.transactionId, throwable.getCause())
} ?: sagaManager.rollback(sagaEvent.id, throwable.getCause())
.subscribeOn(Schedulers.parallel())
.subscribe()
}

protected fun publishNextTransaction(transactionEvent: TransactionEvent) {
protected fun publishNextSaga(sagaEvent: SagaEvent) {
when (nextState) {
NextTransactionState.JOIN -> transactionEvent.nextEvent?.let {
transactionManager.join(transactionEvent.transactionId, it)
} ?: transactionManager.join(transactionEvent.transactionId)
NextSagaState.JOIN -> sagaEvent.nextEvent?.let {
sagaManager.join(sagaEvent.id, it)
} ?: sagaManager.join(sagaEvent.id)

NextTransactionState.COMMIT -> transactionEvent.nextEvent?.let {
transactionManager.commit(transactionEvent.transactionId, it)
} ?: transactionManager.commit(transactionEvent.transactionId)
NextSagaState.COMMIT -> sagaEvent.nextEvent?.let {
sagaManager.commit(sagaEvent.id, it)
} ?: sagaManager.commit(sagaEvent.id)

NextTransactionState.END -> return
NextSagaState.END -> return
}.subscribeOn(Schedulers.parallel())
.subscribe()
}
Expand All @@ -61,7 +61,7 @@ internal sealed class AbstractDispatchFunction<T>(
return this.message ?: this.cause?.message ?: this::class.java.name
}

internal enum class NextTransactionState {
internal enum class NextSagaState {
JOIN,
COMMIT,
END
Expand Down
Loading

0 comments on commit 9088a25

Please sign in to comment.