Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: orchestrator's rollback start previous sequence #138

Merged
merged 4 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<br>

![version 0.4.2](https://img.shields.io/badge/version-0.4.2-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.4.3](https://img.shields.io/badge/version-0.4.3-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
Expand Down Expand Up @@ -120,12 +120,8 @@ class OrchestratorConfigurer(
)
.commit(
orchestrate = { request ->
// When an error occurs, all rollbacks are called from the bottom up,
// starting from the location where the error occurred.
// If a rollback occurs here, all the above rollback functions will be executed sequentially.
throw IllegalArgumentException("Oops! Something went wrong..")
},
rollback = { request ->
// ...
}
)
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftopmsa
version=0.4.2
version=0.4.3
compatibility=17

### Sonarcloud ###
Expand Down
8 changes: 0 additions & 8 deletions src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,43 +63,35 @@ interface OrchestrateChain<OriginReq : Any, T : Any, V : Any> {
* Commits the saga with the operation.
*
* @param orchestrate Operation to be executed along with the commit.
* @param rollback Rollback function to be executed if an exception is thrown in the current orchestrate.
* @param S The final return value of Orchestrator.
* @return Orchestrator
* @see Orchestrate
* @see Rollback*
*/
fun <S : Any> commit(
orchestrate: Orchestrate<V, S>,
rollback: Rollback<V, *>? = null,
): Orchestrator<OriginReq, S>

/**
* @see commit
*/
fun <S : Any> commitReactive(
orchestrate: Orchestrate<V, Mono<S>>,
rollback: Rollback<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

/**
* @param contextOrchestrate Allows using Context maintained in each Saga.
* @param contextRollback Allows using Context maintained in each Saga.
* @see commit
* @see contextOrchestrate
* @see contextRollback
*/
fun <S : Any> commitWithContext(
contextOrchestrate: ContextOrchestrate<V, S>,
contextRollback: ContextRollback<V, *>? = null,
): Orchestrator<OriginReq, S>

/**
* @see commitWithContext
*/
fun <S : Any> commitReactiveWithContext(
contextOrchestrate: ContextOrchestrate<V, Mono<S>>,
contextRollback: ContextRollback<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

interface Pre<T : Any> {
Expand Down
30 changes: 10 additions & 20 deletions src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -146,26 +146,20 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

override fun <S : Any> commit(
orchestrate: Orchestrate<V, S>,
rollback: Rollback<V, *>?,
): Orchestrator<OriginReq, S> {
val nextCommitOrchestrateListener =
getCommitOrchestrateListener<V, S>(CommandType.DEFAULT, orchestrate)
val nextRollbackOrchestrateListener =
getRollbackOrchestrateListener<V, S>(CommandType.DEFAULT, rollback)

return createOrchestrator(nextCommitOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextCommitOrchestrateListener)
}

override fun <S : Any> commitWithContext(
contextOrchestrate: ContextOrchestrate<V, S>,
contextRollback: ContextRollback<V, *>?
): Orchestrator<OriginReq, S> {
val nextCommitOrchestrateListener =
getCommitOrchestrateListener<V, S>(CommandType.CONTEXT, contextOrchestrate)
val nextRollbackOrchestrateListener =
getRollbackOrchestrateListener<V, S>(CommandType.CONTEXT, contextRollback)

return createOrchestrator(nextCommitOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextCommitOrchestrateListener)
}

private fun <T : Any, V : Any> getCommitOrchestrateListener(
Expand Down Expand Up @@ -200,15 +194,14 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

private fun <S : Any> createOrchestrator(
nextCommitOrchestrateListener: CommitOrchestrateListener<V, S>,
nextRollbackOrchestrateListener: RollbackOrchestrateListener<V, S>?
): Orchestrator<OriginReq, S> {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextCommitOrchestrateListener,
nextRollbackOrchestrateListener,
null,
this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
Expand All @@ -227,39 +220,32 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

override fun <S : Any> commitReactive(
orchestrate: Orchestrate<V, Mono<S>>,
rollback: Rollback<V, Mono<*>>?,
): Orchestrator<OriginReq, S> {
val nextJoinOrchestrateListener =
getMonoCommitOrchestrateListener<V, S>(CommandType.DEFAULT, orchestrate)
val nextRollbackOrchestrateListener =
getMonoRollbackOrchestrateListener<V, S>(CommandType.DEFAULT, rollback)

return createOrchestrator(nextJoinOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextJoinOrchestrateListener)
}

override fun <S : Any> commitReactiveWithContext(
contextOrchestrate: ContextOrchestrate<V, Mono<S>>,
contextRollback: ContextRollback<V, Mono<*>>?
): Orchestrator<OriginReq, S> {
val nextJoinOrchestrateListener =
getMonoCommitOrchestrateListener<V, S>(CommandType.CONTEXT, contextOrchestrate)
val nextRollbackOrchestrateListener =
getMonoRollbackOrchestrateListener<V, S>(CommandType.CONTEXT, contextRollback)

return createOrchestrator(nextJoinOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextJoinOrchestrateListener)
}

private fun <S : Any> createOrchestrator(
nextJoinOrchestrateListener: MonoCommitOrchestrateListener<V, S>,
nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener<V, S>?
): Orchestrator<OriginReq, S> {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
null,
this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
Expand Down Expand Up @@ -330,6 +316,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

private fun chainOrchestrateListeners(orchestrateListeners: List<Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>) {
var rollbackSequence = 0
var beforeRollbackSequence = -1
for (listenerWithIdx in orchestrateListeners.withIndex()) {
val isFirst = listenerWithIdx.index == 0
val isLast =
Expand All @@ -341,6 +328,9 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
rollbackSequence = it.orchestrateSequence
}
listener.rollbackSequence = rollbackSequence
listener.beforeRollbackOrchestrateSequence = beforeRollbackSequence

beforeRollbackSequence = rollbackSequence

listener.isFirst = isFirst
listener.isLast = isLast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
val rollbackOrchestrateEvent =
OrchestrateEvent(
orchestrateEvent.orchestratorId,
rollbackSequence,
beforeRollbackOrchestrateSequence,
"",
orchestrateEvent.context,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public Orchestrator<Integer, Integer> intOrchestrator() {
request -> request - 1
)
.commit(
request -> request + 1,
request -> request - 1
request -> request + 1
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class OrchestratorConfigurer {
Mono.fromCallable {
request + 1
}
}, contextRollback = { _, request -> Mono.fromCallable { request - 1 } })
})
}

object IntOrchestrator : Orchestrate<Int, Int> {
Expand Down
38 changes: 30 additions & 8 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ internal class OrchestratorConfigurer(
rollbackOrchestratorResult.add("4")
throw IllegalArgumentException("Rollback")
},
rollback = {
rollbackOrchestratorResult.add("-4")
}
)
}

Expand Down Expand Up @@ -141,10 +138,9 @@ internal class OrchestratorConfigurer(
val start1 = context.decodeContext("start-1", String::class)
val join2 = context.decodeContext("join-2", String::class)
val join3 = context.decodeContext("join-3", String::class)
val rCommit4 = context.decodeContext("r-commit-4", String::class)
val rJoin3 = context.decodeContext("r-join-3", String::class)

contextResult.addAll(listOf(start1, join2, join3, rCommit4, rJoin3))
contextResult.addAll(listOf(start1, join2, join3, rJoin3))
}
)
.joinWithContext(
Expand All @@ -171,9 +167,6 @@ internal class OrchestratorConfigurer(
context.set("commit-4", request)
throw IllegalArgumentException("Rollback")
},
contextRollback = { context, request ->
context.set("r-commit-4", "r$request")
}
)
}

Expand Down Expand Up @@ -237,6 +230,35 @@ internal class OrchestratorConfigurer(
.commit({ it })
}

@Bean(name = ["throwOnStartOrchestrator"])
fun throwOnStartOrchestrator(): Orchestrator<String, String> {
return OrchestratorFactory.instance()
.create<String>("throwOnStartOrchestrator")
.start(
orchestrate = {
throw IllegalArgumentException("Throw error for test.")
}
)
.commit({
"Never reach this line."
})
}

@Bean(name = ["throwOnJoinOrchestrator"])
fun throwOnJoinOrchestrator(): Orchestrator<String, String> {
return OrchestratorFactory.instance()
.create<String>("throwOnJoinOrchestrator")
.start({
"start success"
})
.join({
throw IllegalArgumentException("Throw error for test.")
})
.commit({
"Never reach this line."
})
}

object PairOrchestrate :
Orchestrate<Pair<OrchestratorTest.Foo, OrchestratorTest.Foo>, Pair<OrchestratorTest.Foo, OrchestratorTest.Foo>> {
override fun orchestrate(request: Pair<OrchestratorTest.Foo, OrchestratorTest.Foo>): Pair<OrchestratorTest.Foo, OrchestratorTest.Foo> {
Expand Down
33 changes: 28 additions & 5 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.equality.shouldBeEqualToComparingFields
import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.shouldBe
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.api.TypeReference
import org.rooftop.netx.meta.EnableSaga
Expand Down Expand Up @@ -39,6 +38,8 @@ internal class OrchestratorTest(
@Qualifier("startWithContextOrchestrator") private val startWithContextOrchestrator: Orchestrator<String, String>,
@Qualifier("fooContextOrchestrator") private val fooContextOrchestrator: Orchestrator<String, List<Foo>>,
private val privateOrchestrator: Orchestrator<Private, Private>,
@Qualifier("throwOnStartOrchestrator") private val throwOnStartOrchestrator: Orchestrator<String, String>,
@Qualifier("throwOnJoinOrchestrator") private val throwOnJoinOrchestrator: Orchestrator<String, String>,
) : DescribeSpec({

describe("numberOrchestrator 구현채는") {
Expand Down Expand Up @@ -97,10 +98,10 @@ internal class OrchestratorTest(
}

describe("rollbackOrchestrator 구현채는") {
val expected = listOf("1", "2", "3", "4", "-4", "-3", "-1")
val expected = listOf("1", "2", "3", "4", "-3", "-1")

context("saga 메소드가 호출되면,") {
it("실패한 부분부터 위로 거슬러 올라가며 롤백한다") {
it("실패한 부분 위부터 위로 거슬러 올라가며 롤백한다") {
val result = rollbackOrchestrator.sagaSync("")

result.isSuccess shouldBeEqual false
Expand Down Expand Up @@ -134,7 +135,7 @@ internal class OrchestratorTest(
context("saga 메소드가 호출되면,") {
val expected = listOf("1", "2", "3", "4", "-3", "-1")

it("실패한 부분부터 위로 거슬러 올라가며 롤백한다.") {
it("실패한 부분위부터 위로 거슬러 올라가며 롤백한다.") {
val result = monoRollbackOrchestrator.sagaSync("")

result.isSuccess shouldBeEqual false
Expand All @@ -150,7 +151,7 @@ internal class OrchestratorTest(

describe("contextOrchestrator 구현채는") {
context("saga 메소드가 호출되면,") {
val expected = listOf("0", "1", "2", "r3", "r2")
val expected = listOf("0", "1", "2", "r2")

it("context 에서 아이템을 교환하며 Saga를 진행한다.") {
val result = contextOrchestrator.sagaSync("0")
Expand Down Expand Up @@ -225,6 +226,28 @@ internal class OrchestratorTest(
}
}
}

describe("throwOnStartOrchestrator 구현채는") {
context("start에서 예외가 던져지면,") {
it("해당 예외를 Result에서 throw한다.") {
shouldThrowWithMessage<IllegalArgumentException>("Throw error for test.") {
throwOnStartOrchestrator.sagaSync("throw error in start.")
.decodeResultOrThrow(String::class)
}
}
}
}

describe("throwOnJoinOrchestrator 구현채는") {
context("join에서 예외가 던져지면,") {
it("해당 예외를 Result에서 throw한다.") {
shouldThrowWithMessage<IllegalArgumentException>("Throw error for test.") {
throwOnJoinOrchestrator.sagaSync("throw error in join.")
.decodeResultOrThrow(String::class)
}
}
}
}
}) {
data class Home(
val address: String,
Expand Down
Loading