Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Rollback wouldn't occured if there was no associated rollback function #101

Merged
merged 1 commit into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
var isLast: Boolean = true
var isRollbackable: Boolean = false
var beforeRollbackOrchestrateSequence: Int = 0
var rollbackSequence: Int = orchestrateSequence

private var nextOrchestrateListener: AbstractOrchestrateListener<V, Any>? = null
private var nextRollbackOrchestrateListener: AbstractOrchestrateListener<V, Any>? = null
Expand Down Expand Up @@ -103,10 +104,12 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
throwable: Throwable,
orchestrateEvent: OrchestrateEvent,
) {
val rollbackOrchestrateEvent =
OrchestrateEvent(orchestrateEvent.orchestratorId, rollbackSequence, "")
transactionManager.rollback(
transactionId = transactionId,
cause = throwable.message ?: throwable.localizedMessage,
event = orchestrateEvent
event = rollbackOrchestrateEvent
).subscribeOn(Schedulers.parallel()).subscribe()
}

Expand Down
7 changes: 6 additions & 1 deletion src/main/kotlin/org/rooftop/netx/engine/OrchestrateChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,18 @@ class OrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
}

private fun chainOrchestrateListeners(orchestrateListeners: List<Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>) {
var rollbackSequence = 0
for (listenerWithIdx in orchestrateListeners.withIndex()) {
val isFirst = listenerWithIdx.index == 0
val isLast =
listenerWithIdx.index == (orchestrateListeners.size - 1 - COMMIT_LISTENER_PREFIX)

val listener = listenerWithIdx.value.first
listenerWithIdx.value.second?.let { listener.isRollbackable = true }
listenerWithIdx.value.second?.let {
listener.isRollbackable = true
rollbackSequence = it.orchestrateSequence
}
listener.rollbackSequence = rollbackSequence

listener.isFirst = isFirst
listener.isLast = isLast
Expand Down
13 changes: 13 additions & 0 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.rooftop.netx.engine

import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.engine.OrchestratorTest.Companion.rollbackOrchestratorResult
import org.rooftop.netx.engine.OrchestratorTest.Companion.upChainResult
import org.rooftop.netx.factory.OrchestratorFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand Down Expand Up @@ -92,4 +93,16 @@ class OrchestratorConfigurer(
}
)
}

@Bean(name = ["upChainRollbackOrchestrator"])
fun upChainRollbackOrchestrator(): Orchestrator<String, String> {
return orchestratorFactory.create<String>("upChainRollbackOrchestrator")
.start({ upChainResult.add("1") }, { upChainResult.add("-1") })
.join({ upChainResult.add("2") })
.join({ upChainResult.add("3") }, { upChainResult.add("-3") })
.commit({
upChainResult.add("4")
throw IllegalArgumentException("Rollback for test")
})
}
}
21 changes: 20 additions & 1 deletion src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.kotest.matchers.equals.shouldBeEqual
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.rooftop.netx.redis.RedisContainer
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import java.time.Instant
Expand All @@ -28,7 +29,8 @@ class OrchestratorTest(
private val homeOrchestrator: Orchestrator<Home, Home>,
private val instantOrchestrator: Orchestrator<InstantWrapper, InstantWrapper>,
private val manyTypeOrchestrator: Orchestrator<Int, Home>,
private val rollbackOrchestrator: Orchestrator<String, String>,
@Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator<String, String>,
@Qualifier("upChainRollbackOrchestrator") private val upChainRollbackOrchestrator: Orchestrator<String, String>,
) : DescribeSpec({

describe("numberOrchestrator 구현채는") {
Expand Down Expand Up @@ -103,6 +105,22 @@ class OrchestratorTest(
}
}
}

describe("upStreamRollbackOrchestrator 구현채는") {
val expected = listOf("1", "2", "3", "4", "-3", "-1")

it("호출할 rollback function이 없으면, 가장 가까운 상단의 rollback을 호출한다.") {
val result = upChainRollbackOrchestrator.transactionSync("")

result.isSuccess shouldBeEqual false
shouldThrowWithMessage<IllegalArgumentException>("Rollback for test") {
result.throwError()
}
eventually(5.seconds) {
upChainResult shouldBeEqual expected
}
}
}
}) {
data class Home(
val address: String,
Expand All @@ -121,5 +139,6 @@ class OrchestratorTest(

companion object {
val rollbackOrchestratorResult = mutableListOf<String>()
val upChainResult = mutableListOf<String>()
}
}
Loading