Skip to content

Commit

Permalink
chore: fix launcher rehydrate retry logic (#14029)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Sep 18, 2024
1 parent 20f6542 commit f75a0fa
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 26 deletions.
50 changes: 24 additions & 26 deletions airbyte-workload-launcher/src/main/kotlin/ClaimedProcessor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package io.airbyte.workload.launcher
import com.google.common.annotations.VisibleForTesting
import datadog.trace.api.Trace
import dev.failsafe.Failsafe
import dev.failsafe.FailsafeException
import dev.failsafe.RetryPolicy
import io.airbyte.metrics.lib.ApmTraceUtils
import io.airbyte.metrics.lib.MetricAttribute
Expand All @@ -27,13 +26,16 @@ import io.airbyte.workload.launcher.pipeline.consumer.LauncherInput
import io.airbyte.workload.launcher.pipeline.stages.model.LaunchStageIO
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.toFlux
import java.net.ConnectException
import java.net.SocketException
import java.net.SocketTimeoutException
import java.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

private val logger = KotlinLogging.logger {}

Expand All @@ -45,6 +47,8 @@ class ClaimedProcessor(
@Value("\${airbyte.data-plane-id}") private val dataplaneId: String,
@Value("\${airbyte.workload-launcher.temporal.default-queue.parallelism}") parallelism: Int,
private val claimProcessorTracker: ClaimProcessorTracker,
@Named("claimedProcessorBackoffDuration") private val backoffDuration: Duration = 5.seconds.toJavaDuration(),
@Named("claimedProcessorBackoffMaxDelay") private val backoffMaxDelay: Duration = 60.seconds.toJavaDuration(),
) {
private val scheduler = Schedulers.newParallel("process-claimed-scheduler", parallelism)

Expand Down Expand Up @@ -92,30 +96,24 @@ class ClaimedProcessor(
}

private fun getWorkloadList(workloadListRequest: WorkloadListRequest): WorkloadListResponse {
while (true) {
try {
// TODO: consider tuning the retry policy here, since we currently get the default 2 retries.
return Failsafe.with(
RetryPolicy.builder<Any>()
.withBackoff(Duration.ofSeconds(20), Duration.ofDays(365))
.onRetry { logger.error { "Retrying to fetch workloads for dataplane $dataplaneId" } }
.abortOn { exception ->
when (exception) {
// This makes us to retry only on 5XX errors
is ServerException -> exception.statusCode / 100 != 5
else -> true
}
}
.build(),
)
.get { -> apiClient.workloadApi.workloadList(workloadListRequest) }
} catch (e: FailsafeException) {
if (e.cause !is ConnectException && e.cause !is SocketTimeoutException) {
throw e; // Surface all other errors.
return Failsafe.with(
RetryPolicy.builder<Any>()
.withBackoff(backoffDuration, backoffMaxDelay)
.onRetry { logger.error { "Retrying to fetch workloads for dataplane $dataplaneId" } }
.withMaxAttempts(-1)
.abortOn { exception ->
when (exception) {
// This makes us to retry only on 5XX errors
is ServerException -> exception.statusCode / 100 != 5

// We want to retry on most network errors
is SocketException -> false
is SocketTimeoutException -> false
else -> true
}
}
// On a ConnectionException or SocketTimeoutException, we'll retry indefinitely.
logger.warn { "Failed to connect to workload API fetching workloads for dataplane $dataplaneId, retrying..." }
}
}
.build(),
)
.get { -> apiClient.workloadApi.workloadList(workloadListRequest) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import jakarta.inject.Singleton
import okhttp3.internal.http2.StreamResetException
import java.net.SocketTimeoutException
import java.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

/**
* Micronaut bean factory for general application beans.
Expand Down Expand Up @@ -156,4 +158,12 @@ class ApplicationBeanFactory {
fun connectorApmSupportHelper(): ConnectorApmSupportHelper {
return ConnectorApmSupportHelper()
}

@Singleton
@Named("claimedProcessorBackoffDuration")
fun claimedProcessorBackoffDuration() = 5.seconds.toJavaDuration()

@Singleton
@Named("claimedProcessorBackoffMaxDelay")
fun claimedProcessorBackoffMaxDelay() = 60.seconds.toJavaDuration()
}
37 changes: 37 additions & 0 deletions airbyte-workload-launcher/src/test/kotlin/ClaimedProcessorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.airbyte.workload.api.client.model.generated.WorkloadType
import io.airbyte.workload.launcher.ClaimProcessorTracker
import io.airbyte.workload.launcher.ClaimedProcessor
import io.airbyte.workload.launcher.pipeline.LaunchPipeline
import io.kotlintest.milliseconds
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
Expand All @@ -16,7 +17,12 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import reactor.core.publisher.Mono
import java.net.ConnectException
import java.net.SocketException
import java.net.SocketTimeoutException
import java.util.UUID
import kotlin.time.toJavaDuration
import kotlin.time.toKotlinDuration

class ClaimedProcessorTest {
private lateinit var workloadApi: WorkloadApi
Expand Down Expand Up @@ -45,6 +51,8 @@ class ClaimedProcessorTest {
dataplaneId = "dataplane1",
parallelism = 10,
claimProcessorTracker = claimProcessorTracker,
backoffDuration = 1.milliseconds.toKotlinDuration().toJavaDuration(),
backoffMaxDelay = 2.milliseconds.toKotlinDuration().toJavaDuration(),
)
}

Expand Down Expand Up @@ -81,4 +89,33 @@ class ClaimedProcessorTest {
}
verify(exactly = 0) { claimProcessorTracker.trackNumberOfClaimsToResume(any()) }
}

@Test
fun `test retrieve and process recovers after network issues`() {
every { workloadApi.workloadList(any()) }
.throwsMany(
(1..5).flatMap {
listOf(
ServerException("oops", 500),
ServerException("oops", 502),
SocketException(),
ConnectException(),
SocketTimeoutException(),
)
}.toList(),
)
.andThenAnswer {
WorkloadListResponse(
listOf(
Workload("1", listOf(), "payload", "logPath", "US", WorkloadType.SYNC, UUID.randomUUID()),
Workload("2", listOf(), "payload", "logPath", "US", WorkloadType.SYNC, UUID.randomUUID()),
Workload("3", listOf(), "payload", "logPath", "US", WorkloadType.SYNC, UUID.randomUUID()),
),
)
}
claimedProcessor.retrieveAndProcess()

verify { claimProcessorTracker.trackNumberOfClaimsToResume(3) }
verify(exactly = 3) { launchPipeline.buildPipeline(any()) }
}
}

0 comments on commit f75a0fa

Please sign in to comment.