Skip to content

Commit

Permalink
feat: Let OrchestratorFactory find Orchestrator by orchestratorId (#103)
Browse files Browse the repository at this point in the history
* feat: OrchestratorFactory에서 orchestratorId로 Orchestrator를 찾을 수 있도록 한다

* fix: Rollback occurred even if no associated rollback found (#101)

* feat: OrchestratorFactory에서 orchestratorId로 Orchestrator를 찾을 수 있도록 한다

* fix: upChainResult import
  • Loading branch information
devxb authored Mar 23, 2024
1 parent 5e17548 commit 8a050fd
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 66 deletions.
40 changes: 40 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.rooftop.netx.api

import org.rooftop.netx.engine.DefaultOrchestrateChain
import reactor.core.publisher.Mono

interface OrchestrateChain<OriginReq : Any, T : Any, V : Any> {

fun <S : Any> join(
function: OrchestrateFunction<V, S>,
rollback: RollbackFunction<V, *>? = null,
): DefaultOrchestrateChain<OriginReq, V, S>

fun <S : Any> joinReactive(
function: OrchestrateFunction<V, Mono<S>>,
rollback: RollbackFunction<V, Mono<*>>? = null,
): DefaultOrchestrateChain<OriginReq, V, S>

fun <S : Any> commit(
function: OrchestrateFunction<V, S>,
rollback: RollbackFunction<V, *>? = null,
): Orchestrator<OriginReq, S>

fun <S : Any> commitReactive(
function: OrchestrateFunction<V, Mono<S>>,
rollback: RollbackFunction<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

interface Pre<T : Any> {
fun <V : Any> start(
function: OrchestrateFunction<T, V>,
rollback: RollbackFunction<T, *>? = null,
): DefaultOrchestrateChain<T, T, V>

fun <V : Any> startReactive(
function: OrchestrateFunction<T, Mono<V>>,
rollback: RollbackFunction<T, Mono<*>>? = null,
): DefaultOrchestrateChain<T, T, V>

}
}
9 changes: 9 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/OrchestratorFactory.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.rooftop.netx.api

interface OrchestratorFactory {

fun <T : Any, V : Any> get(orchestratorId: String): Orchestrator<T, V>

fun <T : Any> create(orchestratorId: String): OrchestrateChain.Pre<T>

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ import org.rooftop.netx.api.*
import org.rooftop.netx.engine.listen.*
import reactor.core.publisher.Mono

class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
private val orchestratorId: String,
private val orchestrateSequence: Int,
private val chainContainer: ChainContainer,
private val orchestrateListener: AbstractOrchestrateListener<T, V>,
private val rollbackOrchestrateListener: AbstractOrchestrateListener<T, *>?,
private val beforeOrchestrateChain: OrchestrateChain<OriginReq, out Any, T>? = null,
) {
private val beforeDefaultOrchestrateChain: DefaultOrchestrateChain<OriginReq, out Any, T>? = null,
): OrchestrateChain<OriginReq, T, V> {

private var nextOrchestrateChain: OrchestrateChain<OriginReq, V, out Any>? = null
private var nextDefaultOrchestrateChain: DefaultOrchestrateChain<OriginReq, V, out Any>? = null

fun <S : Any> join(
override fun <S : Any> join(
function: OrchestrateFunction<V, S>,
rollback: RollbackFunction<V, *>? = null,
): OrchestrateChain<OriginReq, V, S> {
rollback: RollbackFunction<V, *>?,
): DefaultOrchestrateChain<OriginReq, V, S> {
val nextJoinOrchestrateListener = getJoinOrchestrateListener(function)
val nextRollbackOrchestrateListener = getRollbackOrchestrateListener<V, S>(rollback)

val nextOrchestrateChain = OrchestrateChain(
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
this,
)
this.nextOrchestrateChain = nextOrchestrateChain
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

return nextOrchestrateChain
return nextDefaultOrchestrateChain
}

private fun <T : Any, V : Any> getJoinOrchestrateListener(function: OrchestrateFunction<T, V>) =
Expand All @@ -46,24 +46,24 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
resultHolder = chainContainer.resultHolder,
)

fun <S : Any> joinReactive(
override fun <S : Any> joinReactive(
function: OrchestrateFunction<V, Mono<S>>,
rollback: RollbackFunction<V, Mono<*>>? = null,
): OrchestrateChain<OriginReq, V, S> {
rollback: RollbackFunction<V, Mono<*>>?,
): DefaultOrchestrateChain<OriginReq, V, S> {
val nextJoinOrchestrateListener = getMonoJoinOrchestrateListener(function)
val nextRollbackOrchestrateListener = getMonoRollbackOrchestrateListener<V, S>(rollback)

val nextOrchestrateChain = OrchestrateChain(
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
this,
)
this.nextOrchestrateChain = nextOrchestrateChain
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

return nextOrchestrateChain
return nextDefaultOrchestrateChain
}

private fun <T : Any, V : Any> getMonoJoinOrchestrateListener(function: OrchestrateFunction<T, Mono<V>>) =
Expand All @@ -77,24 +77,24 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
resultHolder = chainContainer.resultHolder,
)

fun <S : Any> commit(
override fun <S : Any> commit(
function: OrchestrateFunction<V, S>,
rollback: RollbackFunction<V, *>? = null,
rollback: RollbackFunction<V, *>?,
): Orchestrator<OriginReq, S> {
val nextCommitOrchestrateListener = getCommitOrchestrateListener(function)
val nextRollbackOrchestrateListener = getRollbackOrchestrateListener<V, S>(rollback)

return OrchestratorCache.cache(orchestratorId) {
val nextOrchestrateChain = OrchestrateChain(
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextCommitOrchestrateListener,
nextRollbackOrchestrateListener,
this,
)
this.nextOrchestrateChain = nextOrchestrateChain
val firstOrchestrateChain = nextOrchestrateChain.initOrchestrateListeners()
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
val firstOrchestrateChain = nextDefaultOrchestrateChain.initOrchestrateListeners()

return@cache OrchestratorManager<OriginReq, S>(
transactionManager = chainContainer.transactionManager,
Expand Down Expand Up @@ -131,25 +131,25 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
)
}

fun <S : Any> commitReactive(
override fun <S : Any> commitReactive(
function: OrchestrateFunction<V, Mono<S>>,
rollback: RollbackFunction<V, Mono<*>>? = null,
rollback: RollbackFunction<V, Mono<*>>?,
): Orchestrator<OriginReq, S> {
val nextJoinOrchestrateListener = getMonoCommitOrchestrateListener(function)
val nextRollbackOrchestrateListener = getMonoRollbackOrchestrateListener<V, S>(rollback)

return OrchestratorCache.cache(orchestratorId) {
val nextOrchestrateChain = OrchestrateChain(
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
this,
)
this.nextOrchestrateChain = nextOrchestrateChain
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

val firstOrchestrateChain = nextOrchestrateChain.initOrchestrateListeners()
val firstOrchestrateChain = nextDefaultOrchestrateChain.initOrchestrateListeners()

return@cache OrchestratorManager<OriginReq, S>(
transactionManager = chainContainer.transactionManager,
Expand All @@ -163,36 +163,36 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
}

@Suppress("UNCHECKED_CAST")
private fun initOrchestrateListeners(): OrchestrateChain<OriginReq, OriginReq, out Any> {
private fun initOrchestrateListeners(): DefaultOrchestrateChain<OriginReq, OriginReq, out Any> {
val cursorAndOrchestrateListener = getAllOrchestrateListeners()

chainOrchestrateListeners(cursorAndOrchestrateListener.second)
chainRollbackListeners(cursorAndOrchestrateListener.second)

addDispatcher(cursorAndOrchestrateListener.second)

return cursorAndOrchestrateListener.first as OrchestrateChain<OriginReq, OriginReq, out Any>
return cursorAndOrchestrateListener.first as DefaultOrchestrateChain<OriginReq, OriginReq, out Any>
}

private fun getAllOrchestrateListeners(): Pair<OrchestrateChain<OriginReq, out Any, out Any>?, MutableList<Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>> {
private fun getAllOrchestrateListeners(): Pair<DefaultOrchestrateChain<OriginReq, out Any, out Any>?, MutableList<Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>> {
val orchestrateListeners = mutableListOf<
Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>()

var orchestrateChainCursor: OrchestrateChain<OriginReq, out Any, out Any>? = this
while (orchestrateChainCursor != null) {
var defaultOrchestrateChainCursor: DefaultOrchestrateChain<OriginReq, out Any, out Any>? = this
while (defaultOrchestrateChainCursor != null) {
orchestrateListeners.add(
orchestrateChainCursor.orchestrateListener
to orchestrateChainCursor.rollbackOrchestrateListener
defaultOrchestrateChainCursor.orchestrateListener
to defaultOrchestrateChainCursor.rollbackOrchestrateListener
)
if (orchestrateChainCursor.beforeOrchestrateChain == null) {
if (defaultOrchestrateChainCursor.beforeDefaultOrchestrateChain == null) {
break
}
orchestrateChainCursor = orchestrateChainCursor.beforeOrchestrateChain
defaultOrchestrateChainCursor = defaultOrchestrateChainCursor.beforeDefaultOrchestrateChain
}

orchestrateListeners.reverse()

return orchestrateChainCursor to orchestrateListeners
return defaultOrchestrateChainCursor to orchestrateListeners
}

private fun chainOrchestrateListeners(orchestrateListeners: List<Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>) {
Expand Down Expand Up @@ -270,23 +270,23 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
)
}

class Pre<T : Any> internal constructor(
internal class Pre<T : Any> internal constructor(
private val orchestratorId: String,
private val transactionManager: TransactionManager,
private val transactionDispatcher: AbstractTransactionDispatcher,
private val codec: Codec,
private val resultHolder: ResultHolder,
private val requestHolder: RequestHolder,
) {
): OrchestrateChain.Pre<T> {

fun <V : Any> start(
override fun <V : Any> start(
function: OrchestrateFunction<T, V>,
rollback: RollbackFunction<T, *>? = null,
): OrchestrateChain<T, T, V> {
rollback: RollbackFunction<T, *>?,
): DefaultOrchestrateChain<T, T, V> {
val startOrchestrateListener = getStartOrchestrateListener(function)
val rollbackOrchestrateListener = getRollbackOrchestrateListener<V>(rollback)

return OrchestrateChain(
return DefaultOrchestrateChain(
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
Expand Down Expand Up @@ -319,14 +319,14 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
)
}

fun <V : Any> startReactive(
override fun <V : Any> startReactive(
function: OrchestrateFunction<T, Mono<V>>,
rollback: RollbackFunction<T, Mono<*>>? = null,
): OrchestrateChain<T, T, V> {
rollback: RollbackFunction<T, Mono<*>>?,
): DefaultOrchestrateChain<T, T, V> {
val startOrchestrateListener = getMonoStartOrchestrateListener(function)
val rollbackOrchestrateListener = getMonoRollbackOrchestrateListener<V>(rollback)

return OrchestrateChain(
return DefaultOrchestrateChain(
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
Expand Down
8 changes: 7 additions & 1 deletion src/main/kotlin/org/rooftop/netx/engine/OrchestratorCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package org.rooftop.netx.engine

import org.rooftop.netx.api.Orchestrator

@Suppress("UNCHECKED_CAST")
internal object OrchestratorCache {

private val cache: MutableMap<String, Orchestrator<*, *>> = mutableMapOf()

@Suppress("UNCHECKED_CAST")
internal fun <T : Any, V : Any> get(key: String): Orchestrator<T, V> {
return cache[key]?.let {
it as Orchestrator<T, V>
} ?: throw IllegalArgumentException("Cannot find orchestrator by orchestratorId \"$key\"")
}

internal fun <T : Any, V : Any> cache(
key: String,
behavior: () -> Orchestrator<*, *>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package org.rooftop.netx.factory
package org.rooftop.netx.engine

import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.AbstractTransactionDispatcher
import org.rooftop.netx.engine.OrchestrateChain
import org.rooftop.netx.engine.RequestHolder
import org.rooftop.netx.engine.ResultHolder
import org.rooftop.netx.api.*
import org.rooftop.netx.api.OrchestratorFactory

class OrchestratorFactory internal constructor(
private val transactionManager: TransactionManager,
private val transactionDispatcher: AbstractTransactionDispatcher,
private val codec: Codec,
private val resultHolder: ResultHolder,
private val requestHolder: RequestHolder,
) {
) : OrchestratorFactory {

fun <T : Any> create(orchestratorId: String): OrchestrateChain.Pre<T> {
return OrchestrateChain.Pre(
override fun <T : Any, V : Any> get(orchestratorId: String): Orchestrator<T, V> =
OrchestratorCache.get(orchestratorId)

override fun <T : Any> create(orchestratorId: String): OrchestrateChain.Pre<T> {
return DefaultOrchestrateChain.Pre(
orchestratorId = orchestratorId,
transactionManager = transactionManager,
transactionDispatcher = transactionDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.rooftop.netx.engine.core.Transaction
import org.rooftop.netx.engine.logging.LoggerFactory
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.logger
import org.rooftop.netx.factory.OrchestratorFactory
import org.rooftop.netx.engine.OrchestratorFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.ApplicationContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.rooftop.netx.javasupports;

import org.rooftop.netx.api.Orchestrator;
import org.rooftop.netx.factory.OrchestratorFactory;
import org.rooftop.netx.engine.OrchestratorFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.rooftop.netx.client

import org.rooftop.netx.api.OrchestrateFunction
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.factory.OrchestratorFactory
import org.rooftop.netx.engine.OrchestratorFactory
import org.springframework.context.annotation.Bean
import reactor.core.publisher.Mono

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.rooftop.netx.engine
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.engine.OrchestratorTest.Companion.rollbackOrchestratorResult
import org.rooftop.netx.engine.OrchestratorTest.Companion.upChainResult
import org.rooftop.netx.factory.OrchestratorFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import reactor.core.publisher.Mono
Expand Down
Loading

0 comments on commit 8a050fd

Please sign in to comment.