Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-1007]: Livy should not spawn one thread per job to track the job on Kubernetes #453

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@

# If Livy can't find the Kubernetes app within this time, consider it lost.
# livy.server.kubernetes.app-lookup-timeout = 600s
# If Livy can't find the Kubernetes app within this max times, consider it lost.
# livy.server.kubernetes.app-lookup.max-failed.times = 120
# The size of thread pool to monitor all Kubernetes apps.
# livy.server.kubernetes.app-lookup.thread-pool.size = 4
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ object LivyConf {

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// If Livy can't find the Kubernetes app within this max times, consider it lost.
val KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES =
Entry("livy.server.kubernetes.app-lookup.max-failed.times", 120)
// The size of thread pool to monitor all Kubernetes apps.
val KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE =
Entry("livy.server.kubernetes.app-lookup.thread-pool.size", 4)

// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

Expand Down
219 changes: 184 additions & 35 deletions server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.livy.utils

import java.net.URLEncoder
import java.util.Collections
import java.util.concurrent.TimeoutException
import java.util.concurrent._

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
Expand All @@ -33,12 +33,16 @@ import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder}
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _}
import org.apache.commons.lang.StringUtils

import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.{LivyConf, Logging}

object SparkKubernetesApp extends Logging {

private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()

private val monitorAppThreadMap = new java.util.concurrent.ConcurrentHashMap[Thread, Long]()

private val appQueue = new ConcurrentLinkedQueue[SparkKubernetesApp]()

private val leakedAppsGCThread = new Thread() {
override def run(): Unit = {
import KubernetesExtensions._
Expand Down Expand Up @@ -97,6 +101,33 @@ object SparkKubernetesApp extends Logging {
}
}

private val checkMonitorAppTimeoutThread = new Thread() {
override def run(): Unit = {
while (true) {
try {
val iter = monitorAppThreadMap.entrySet().iterator()
val now = System.currentTimeMillis()

while (iter.hasNext) {
val entry = iter.next()
val thread = entry.getKey
val updatedTime = entry.getValue

val remaining: Long = now - updatedTime - pollInterval.toMillis
if (remaining > appLookupTimeout.toMillis) {
thread.interrupt()
}
}

Thread.sleep(pollInterval.toMillis)
} catch {
case e: InterruptedException =>
error("Apps timeout monitoring thread was interrupted.", e)
}
}
}
}

private var livyConf: LivyConf = _

private var cacheLogSize: Int = _
Expand All @@ -108,7 +139,10 @@ object SparkKubernetesApp extends Logging {

var kubernetesClient: DefaultKubernetesClient = _

def init(livyConf: LivyConf): Unit = {
private var appLookupThreadPoolSize: Long = _
private var appLookupMaxFailedTimes: Long = _

def init(livyConf: LivyConf, client: Option[KubernetesClient] = None): Unit = {
this.livyConf = livyConf

// KubernetesClient is thread safe. Create once, share it across threads.
Expand All @@ -119,6 +153,9 @@ object SparkKubernetesApp extends Logging {
appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds

appLookupThreadPoolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
appLookupMaxFailedTimes = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES)

sessionLeakageCheckInterval =
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
Expand All @@ -131,6 +168,12 @@ object SparkKubernetesApp extends Logging {
setName("RefreshServiceAccountTokenThread")
RefreshServiceAccountTokenThread.setDaemon(true)
RefreshServiceAccountTokenThread.start()

checkMonitorAppTimeoutThread.setDaemon(true)
checkMonitorAppTimeoutThread.setName("CheckMonitorAppTimeoutThread")
checkMonitorAppTimeoutThread.start()

initKubernetesAppMonitorThreadPool(livyConf)
}

// Returning T, throwing the exception on failure
Expand All @@ -147,6 +190,53 @@ object SparkKubernetesApp extends Logging {
}
}

class KubernetesAppMonitorRunnable extends Runnable {
override def run(): Unit = {
while (true) {
try {
val poolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
var numberOfAppsToProcess = appQueue.size() / poolSize
if (numberOfAppsToProcess < 1) {
numberOfAppsToProcess = 1
} else if (numberOfAppsToProcess > 20) {
numberOfAppsToProcess = 20
}
for (_ <- 0 until numberOfAppsToProcess) {
// update time when monitor app so that
// checkMonitorAppTimeoutThread can check whether the thread was blocked on monitoring
monitorAppThreadMap.put(Thread.currentThread(), System.currentTimeMillis())
val app = appQueue.poll()
if (app != null) {
app.monitorSparkKubernetesApp()
if (app.isRunning) {
appQueue.add(app)
}
}
}
Thread.sleep(pollInterval.toMillis)
} catch {
case e: InterruptedException =>
error(s"Kubernetes app monitoring was interrupted.", e)
}
}
}
}

private def initKubernetesAppMonitorThreadPool(livyConf: LivyConf): Unit = {
val poolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
val KubernetesAppMonitorThreadPool: ExecutorService =
Executors.newFixedThreadPool(poolSize)

val runnable = new KubernetesAppMonitorRunnable()

for (_ <- 0 until poolSize) {
KubernetesAppMonitorThreadPool.execute(runnable)
}
}

def getAppSize: Int = appQueue.size()

def clearApps(): Unit = appQueue.clear()
}

class SparkKubernetesApp private[utils] (
Expand All @@ -162,26 +252,59 @@ class SparkKubernetesApp private[utils] (
import KubernetesExtensions._
import SparkKubernetesApp._

appQueue.add(this)
private var killed = false
private val appPromise: Promise[KubernetesApplication] = Promise()
private[utils] var state: SparkApp.State = SparkApp.State.STARTING
private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]

// Exposed for unit test.
// TODO Instead of spawning a thread for every session, create a centralized thread and
// batch Kubernetes queries.
private[utils] val kubernetesAppMonitorThread = Utils
.startDaemonThread(s"kubernetesAppMonitorThread-$this") {
private var kubernetesTagToAppIdFailedTimes: Int = _
private var kubernetesAppMonitorFailedTimes: Int = _

private def failToMonitor(): Unit = {
changeState(SparkApp.State.FAILED)
process.foreach(_.destroy())
leakedAppTags.put(appTag, System.currentTimeMillis())
}

private def failToGetAppId(): Unit = {
kubernetesTagToAppIdFailedTimes += 1
if (kubernetesTagToAppIdFailedTimes > appLookupMaxFailedTimes) {
val msg = "No KUBERNETES application is found with tag " +
s"${appTag.toLowerCase}. This may be because " +
"1) spark-submit fail to submit application to KUBERNETES; " +
"or 2) KUBERNETES cluster doesn't have enough resource to start the application in time. " +
"Please check Livy log and KUBERNETES log to know the details."

error(s"Failed monitoring the app $appTag: $msg")
kubernetesDiagnostics = ArrayBuffer(msg)
failToMonitor()
}
}

private def monitorSparkKubernetesApp(): Unit = {
try {
if (killed) {
changeState(SparkApp.State.KILLED)
} else if (isProcessErrExit) {
changeState(SparkApp.State.FAILED)
}
// Get KubernetesApplication by appTag.
val app: KubernetesApplication = try {
val appOption: Option[KubernetesApplication] = try {
getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
} catch {
case e: Exception =>
failToGetAppId()
appPromise.failure(e)
throw e
return
}
if (appOption.isEmpty) {
failToGetAppId()
return
}
appPromise.success(app)
val app: KubernetesApplication = appOption.get
appPromise.trySuccess(app)
val appId = app.getApplicationId

Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
Expand All @@ -192,7 +315,9 @@ class SparkKubernetesApp private[utils] (
}

var appInfo = AppInfo()
while (isRunning) {

// while loop is replaced with "if" condition so that another thread can process and continue
if (isRunning) {
try {
Clock.sleep(pollInterval.toMillis)

Expand Down Expand Up @@ -222,14 +347,22 @@ class SparkKubernetesApp private[utils] (
} catch {
// TODO analyse available exceptions
case e: Throwable =>
throw e
error(s"Failed to refresh application state for $appTag.", e)
}
}

kubernetesTagToAppIdFailedTimes = 0
kubernetesAppMonitorFailedTimes = 0
debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
Thread.currentThread().setName(s"appMonitorCommonThreadPool")
} catch {
case _: InterruptedException =>
kubernetesDiagnostics = ArrayBuffer("Application stopped by user.")
changeState(SparkApp.State.KILLED)
case e: InterruptedException =>
kubernetesAppMonitorFailedTimes += 1
if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
error(s"Monitoring of the app $appTag was interrupted.", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
failToMonitor()
}
case NonFatal(e) =>
error(s"Error while refreshing Kubernetes state", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
Expand All @@ -250,18 +383,38 @@ class SparkKubernetesApp private[utils] (
("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)

override def kill(): Unit = synchronized {
try {
withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)))
} catch {
// We cannot kill the Kubernetes app without the appTag.
// There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
// We don't want a stuck session that can't be deleted. Emit a warning and move on.
case _: TimeoutException | _: InterruptedException =>
warn("Deleting a session while its Kubernetes application is not found.")
kubernetesAppMonitorThread.interrupt()
} finally {
process.foreach(_.destroy())
killed = true

if (!isRunning) {
return
}

process.foreach(_.destroy())

def applicationDetails: Option[Try[KubernetesApplication]] = appPromise.future.value
if (applicationDetails.isEmpty) {
leakedAppTags.put(appTag, System.currentTimeMillis())
return
}
def kubernetesApplication: KubernetesApplication = applicationDetails.get.get
if (kubernetesApplication != null && kubernetesApplication.getApplicationId != null) {
try {
withRetry(kubernetesClient.killApplication(
Await.result(appPromise.future, appLookupTimeout)))
} catch {
// We cannot kill the Kubernetes app without the appTag.
// There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
// We don't want a stuck session that can't be deleted. Emit a warning and move on.
case _: TimeoutException | _: InterruptedException =>
warn("Deleting a session while its Kubernetes application is not found.")
}
} else {
leakedAppTags.put(appTag, System.currentTimeMillis())
}
}

private def isProcessErrExit: Boolean = {
process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
}

private def isRunning: Boolean = {
Expand All @@ -282,18 +435,17 @@ class SparkKubernetesApp private[utils] (
*
* @param appTag The application tag tagged on the target application.
* If the tag is not unique, it returns the first application it found.
* @return KubernetesApplication or the failure.
* @return Option[KubernetesApplication] or the failure.
*/
@tailrec
private def getAppFromTag(
appTag: String,
pollInterval: duration.Duration,
deadline: Deadline): KubernetesApplication = {
deadline: Deadline): Option[KubernetesApplication] = {
import KubernetesExtensions._

withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
match {
case Some(app) => app
case Some(app) => Some(app)
case None =>
if (deadline.isOverdue) {
process.foreach(_.destroy())
Expand All @@ -307,10 +459,7 @@ class SparkKubernetesApp private[utils] (
throw new IllegalStateException(s"Failed to submit Kubernetes application with tag" +
s" $appTag. 'spark-submit' exited with non-zero status. " +
s"Please check Livy log and Kubernetes log to know the details.")
} else {
Clock.sleep(pollInterval.toMillis)
getAppFromTag(appTag, pollInterval, deadline)
}
} else None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,32 @@ import java.util.Objects._

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec}
import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito.when
import org.scalatest.FunSpec
import org.scalatest.{BeforeAndAfterAll, FunSpec}
import org.scalatestplus.mockito.MockitoSugar._

import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL

class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with BeforeAndAfterAll {

override def beforeAll(): Unit = {
super.beforeAll()
val livyConf = new LivyConf()
livyConf.set(LivyConf.KUBERNETES_POLL_INTERVAL, "500ms")
livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")

val client = mock[KubernetesClient]
SparkKubernetesApp.init(livyConf, Some(client))
SparkKubernetesApp.clearApps
}

override def afterAll(): Unit = {
super.afterAll()
assert(SparkKubernetesApp.getAppSize === 0)
}

describe("KubernetesAppReport") {
import scala.collection.JavaConverters._
Expand Down
Loading