Skip to content

Commit

Permalink
[LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in …
Browse files Browse the repository at this point in the history
…safe mode (#440)

## What changes were proposed in this pull request?

HDFS safe mode is checked when livy session is created. If safe mode is ON, then IllegalStateException is thrown after max retry attempts (configurable) with safe mode interval (configurable) checks are done. If safe mode is OFF, then livy will be able to create session.

https://issues.apache.org/jira/browse/LIVY-977

## How was this patch tested?

Added unit test cases to validate code changes. Also, done manual testing in CDP cluster by creating livy sessions with HDFS safe mode check ON/OFF.
  • Loading branch information
RajshekharMuchandi authored Mar 12, 2024
1 parent 96b1eb6 commit f615f27
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
5 changes: 5 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,8 @@
# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true

# value specifies interval to check safe mode in hdfs filesystem
# livy.server.hdfs.safe-mode.interval = 5
# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
# livy.server.hdfs.safe-mode.max.retry.attempts = 10
6 changes: 6 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")

// value specifies interval to check safe mode in hdfs filesystem
val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS = Entry("livy.server.hdfs.safe-mode.interval", 5)

// value specifies max attempts to retry when safe mode is ON in hdfs filesystem
val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.livy.server.recovery
import java.io.{FileNotFoundException, IOException}
import java.net.URI
import java.util
import java.util.concurrent.TimeUnit

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -28,6 +29,8 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.Utils.usingResource
Expand All @@ -42,6 +45,8 @@ class FileSystemStateStore(
this(livyConf, None)
}

private val fs = FileSystem.newInstance(livyConf.hadoopConf)

private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(fsPath != null && !fsPath.isEmpty,
Expand All @@ -57,6 +62,8 @@ class FileSystemStateStore(
// Only Livy user should have access to state files.
fileContext.setUMask(new FsPermission("077"))

startSafeModeCheck()

// Create state store dir if it doesn't exist.
val stateStorePath = absPath(".")
try {
Expand Down Expand Up @@ -134,4 +141,42 @@ class FileSystemStateStore(
}

private def absPath(key: String): Path = new Path(fsUri.getPath(), key)

/**
* Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
*/
def isFsInSafeMode(): Boolean = fs match {
case dfs: DistributedFileSystem =>
isFsInSafeMode(dfs)
case _ =>
false
}

def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
/* true to check only for Active NNs status */
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true)
}

def startSafeModeCheck(): Unit = {
// Cannot probe anything while the FS is in safe mode,
// so wait for seconds which is configurable
val safeModeInterval = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS)
val safeModeMaxRetryAttempts = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS)
for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) {
info("HDFS is still in safe mode. Waiting...")
Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval))
}

// if hdfs is still in safe mode
// even after max retry attempts
// then throw IllegalStateException
if (isFsInSafeMode()) {
throw new IllegalStateException("Reached max retry attempts for safe mode check " +
"in hdfs file system")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import java.util
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.hamcrest.Description
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
import org.mockito.Mockito.{atLeastOnce, verify, when}
import org.mockito.Mockito.{atLeastOnce, spy, verify, when}
import org.mockito.internal.matchers.Equals
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand All @@ -53,6 +54,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
conf
}

def makeConfWithTwoSeconds(): LivyConf = {
val conf = new LivyConf()
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2))
conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2))
conf
}

def mockFileContext(rootDirPermission: String): FileContext = {
val fileContext = mock[FileContext]
val rootDirStatus = mock[FileStatus]
Expand Down Expand Up @@ -188,5 +197,29 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {

verify(fileContext).delete(pathEq("/key"), equal(false))
}

it("set safe mode ON and wait") {
val fileContext = mockFileContext("700")
val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext)))
val dfs = mock[DistributedFileSystem]
provider.isFsInSafeMode()
assert(!provider.isFsInSafeMode(dfs))
}

it("provider throws IllegalStateException when reaches 'N' " +
"max attempts to access HDFS file system") {
val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(),
Some(mockFileContext("700")))
provider.inSafeMode = true
intercept[IllegalStateException](provider.startSafeModeCheck())
}
}

private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext])
extends FileSystemStateStore(conf, context) {
var inSafeMode = true

override def isFsInSafeMode(): Boolean = inSafeMode
}

}

0 comments on commit f615f27

Please sign in to comment.