From f615f272e9130d02170024832ea308516b907195 Mon Sep 17 00:00:00 2001 From: RajshekharMuchandi Date: Tue, 12 Mar 2024 20:57:17 +0530 Subject: [PATCH] [LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in safe mode (#440) ## What changes were proposed in this pull request? HDFS safe mode is checked when livy session is created. If safe mode is ON, then IllegalStateException is thrown after max retry attempts (configurable) with safe mode interval (configurable) checks are done. If safe mode is OFF, then livy will be able to create session. https://issues.apache.org/jira/browse/LIVY-977 ## How was this patch tested? Added unit test cases to validate code changes. Also, done manual testing in CDP cluster by creating livy sessions with HDFS safe mode check ON/OFF. --- conf/livy.conf.template | 5 +++ .../main/scala/org/apache/livy/LivyConf.scala | 6 +++ .../recovery/FileSystemStateStore.scala | 45 +++++++++++++++++++ .../recovery/FileSystemStateStoreSpec.scala | 35 ++++++++++++++- 4 files changed, 90 insertions(+), 1 deletion(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 7566971c3..e99251d02 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -195,3 +195,8 @@ # Enable to allow custom classpath by proxy user in cluster mode # The below configuration parameter is disabled by default. # livy.server.session.allow-custom-classpath = true + +# value specifies interval to check safe mode in hdfs filesystem +# livy.server.hdfs.safe-mode.interval = 5 +# value specifies max attempts to retry when safe mode is ON in hdfs filesystem +# livy.server.hdfs.safe-mode.max.retry.attempts = 10 diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 31b687259..720aa4e15 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -252,6 +252,12 @@ object LivyConf { // how often to check livy session leakage val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // value specifies interval to check safe mode in hdfs filesystem + val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS = Entry("livy.server.hdfs.safe-mode.interval", 5) + + // value specifies max attempts to retry when safe mode is ON in hdfs filesystem + val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12) + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala index 826a2fbd7..6fee7f0e2 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala @@ -20,6 +20,7 @@ package org.apache.livy.server.recovery import java.io.{FileNotFoundException, IOException} import java.net.URI import java.util +import java.util.concurrent.TimeUnit import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -28,6 +29,8 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.apache.hadoop.fs.Options.{CreateOpts, Rename} import org.apache.hadoop.fs.permission.{FsAction, FsPermission} +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.livy.{LivyConf, Logging} import org.apache.livy.Utils.usingResource @@ -42,6 +45,8 @@ class FileSystemStateStore( this(livyConf, None) } + private val fs = FileSystem.newInstance(livyConf.hadoopConf) + private val fsUri = { val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL) require(fsPath != null && !fsPath.isEmpty, @@ -57,6 +62,8 @@ class FileSystemStateStore( // Only Livy user should have access to state files. fileContext.setUMask(new FsPermission("077")) + startSafeModeCheck() + // Create state store dir if it doesn't exist. val stateStorePath = absPath(".") try { @@ -134,4 +141,42 @@ class FileSystemStateStore( } private def absPath(key: String): Path = new Path(fsUri.getPath(), key) + + /** + * Checks whether HDFS is in safe mode. + * + * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons + * makes it more public than not. + */ + def isFsInSafeMode(): Boolean = fs match { + case dfs: DistributedFileSystem => + isFsInSafeMode(dfs) + case _ => + false + } + + def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { + /* true to check only for Active NNs status */ + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true) + } + + def startSafeModeCheck(): Unit = { + // Cannot probe anything while the FS is in safe mode, + // so wait for seconds which is configurable + val safeModeInterval = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS) + val safeModeMaxRetryAttempts = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS) + for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) { + info("HDFS is still in safe mode. Waiting...") + Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval)) + } + + // if hdfs is still in safe mode + // even after max retry attempts + // then throw IllegalStateException + if (isFsInSafeMode()) { + throw new IllegalStateException("Reached max retry attempts for safe mode check " + + "in hdfs file system") + } + } + } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala index 082a80ab9..1ee1a2fe2 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala @@ -23,10 +23,11 @@ import java.util import org.apache.hadoop.fs._ import org.apache.hadoop.fs.Options.{CreateOpts, Rename} import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.hdfs.DistributedFileSystem import org.hamcrest.Description import org.mockito.ArgumentMatcher import org.mockito.Matchers.{any, anyInt, argThat, eq => equal} -import org.mockito.Mockito.{atLeastOnce, verify, when} +import org.mockito.Mockito.{atLeastOnce, spy, verify, when} import org.mockito.internal.matchers.Equals import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -53,6 +54,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { conf } + def makeConfWithTwoSeconds(): LivyConf = { + val conf = new LivyConf() + conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/") + conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2)) + conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2)) + conf + } + def mockFileContext(rootDirPermission: String): FileContext = { val fileContext = mock[FileContext] val rootDirStatus = mock[FileStatus] @@ -188,5 +197,29 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { verify(fileContext).delete(pathEq("/key"), equal(false)) } + + it("set safe mode ON and wait") { + val fileContext = mockFileContext("700") + val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext))) + val dfs = mock[DistributedFileSystem] + provider.isFsInSafeMode() + assert(!provider.isFsInSafeMode(dfs)) + } + + it("provider throws IllegalStateException when reaches 'N' " + + "max attempts to access HDFS file system") { + val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(), + Some(mockFileContext("700"))) + provider.inSafeMode = true + intercept[IllegalStateException](provider.startSafeModeCheck()) + } } + + private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext]) + extends FileSystemStateStore(conf, context) { + var inSafeMode = true + + override def isFsInSafeMode(): Boolean = inSafeMode + } + }