aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-01 14:42:18 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-01 14:42:18 -0800
commitcf04fdfe71abc395163a625cc1f99ec5e54cc07e (patch)
tree4ad787aec9107ae74935e9a30342aaf419f49366 /core
parent046e32ed8467e0f46ffeca1a95d4d40017eb5bdb (diff)
downloadspark-cf04fdfe71abc395163a625cc1f99ec5e54cc07e.tar.gz
spark-cf04fdfe71abc395163a625cc1f99ec5e54cc07e.tar.bz2
spark-cf04fdfe71abc395163a625cc1f99ec5e54cc07e.zip
[SPARK-11020][CORE] Wait for HDFS to leave safe mode before initializing HS.
Large HDFS clusters may take a while to leave safe mode when starting; this change makes the HS wait for that before doing checks about its configuraton. This means the HS won't stop right away if HDFS is in safe mode and the configuration is not correct, but that should be a very uncommon situation. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9043 from vanzin/SPARK-11020.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala104
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala65
2 files changed, 166 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 80bfda9ddd..24aa386c72 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val NOT_STARTED = "<Not Started>"
+ // Interval between safemode checks.
+ private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
+ "spark.history.fs.safemodeCheck.interval", "5s")
+
// Interval between each check for event log updates
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
@@ -107,9 +112,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- initialize()
+ // Conf option used for testing the initialization code.
+ val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) {
+ initialize(None)
+ } else {
+ null
+ }
+
+ private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
+ if (!isFsInSafeMode()) {
+ startPolling()
+ return null
+ }
+
+ // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
+ // for the FS to leave safe mode before enabling polling. This allows the main history server
+ // UI to be shown (so that the user can see the HDFS status).
+ //
+ // The synchronization in the run() method is needed because of the tests; mockito can
+ // misbehave if the test is modifying the mocked methods while the thread is calling
+ // them.
+ val initThread = new Thread(new Runnable() {
+ override def run(): Unit = {
+ try {
+ clock.synchronized {
+ while (isFsInSafeMode()) {
+ logInfo("HDFS is still in safe mode. Waiting...")
+ val deadline = clock.getTimeMillis() +
+ TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
+ clock.waitTillTime(deadline)
+ }
+ }
+ startPolling()
+ } catch {
+ case _: InterruptedException =>
+ }
+ }
+ })
+ initThread.setDaemon(true)
+ initThread.setName(s"${getClass().getSimpleName()}-init")
+ initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
+ new Thread.UncaughtExceptionHandler() {
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ logError("Error initializing FsHistoryProvider.", e)
+ System.exit(1)
+ }
+ }))
+ initThread.start()
+ initThread
+ }
- private def initialize(): Unit = {
+ private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
@@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)
+ override def getConfig(): Map[String, String] = {
+ val safeMode = if (isFsInSafeMode()) {
+ Map("HDFS State" -> "In safe mode, application logs not available.")
+ } else {
+ Map()
+ }
+ Map("Event log directory" -> logDir.toString) ++ safeMode
+ }
+
+ override def stop(): Unit = {
+ if (initThread != null && initThread.isAlive()) {
+ initThread.interrupt()
+ initThread.join()
+ }
+ }
/**
* Builds the application list based on the current contents of the log directory.
@@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
+ /**
+ * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
+ * so we have to resort to ugly reflection (as usual...).
+ *
+ * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
+ * makes it more public than not.
+ */
+ private[history] def isFsInSafeMode(): Boolean = fs match {
+ case dfs: DistributedFileSystem =>
+ isFsInSafeMode(dfs)
+ case _ =>
+ false
+ }
+
+ // For testing.
+ private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
+ val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"
+ val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
+ val actionClass: Class[_] =
+ try {
+ getClass().getClassLoader().loadClass(hadoop2Class)
+ } catch {
+ case _: ClassNotFoundException =>
+ getClass().getClassLoader().loadClass(hadoop1Class)
+ }
+
+ val action = actionClass.getField("SAFEMODE_GET").get(null)
+ val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
+ method.invoke(dfs, action).asInstanceOf[Boolean]
+ }
+
}
private[history] object FsHistoryProvider {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 73cff89544..833aab14ca 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -24,13 +24,19 @@ import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doReturn, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.io._
@@ -407,6 +413,65 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("provider correctly checks whether fs is in safe mode") {
+ val provider = spy(new FsHistoryProvider(createTestConf()))
+ val dfs = mock(classOf[DistributedFileSystem])
+ // Asserts that safe mode is false because we can't really control the return value of the mock,
+ // since the API is different between hadoop 1 and 2.
+ assert(!provider.isFsInSafeMode(dfs))
+ }
+
+ test("provider waits for safe mode to finish before initializing") {
+ val clock = new ManualClock()
+ val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ doReturn(true).when(provider).isFsInSafeMode()
+
+ val initThread = provider.initialize(None)
+ try {
+ provider.getConfig().keys should contain ("HDFS State")
+
+ clock.setTime(5000)
+ provider.getConfig().keys should contain ("HDFS State")
+
+ // Synchronization needed because of mockito.
+ clock.synchronized {
+ doReturn(false).when(provider).isFsInSafeMode()
+ clock.setTime(10000)
+ }
+
+ eventually(timeout(1 second), interval(10 millis)) {
+ provider.getConfig().keys should not contain ("HDFS State")
+ }
+ } finally {
+ provider.stop()
+ }
+ }
+
+ test("provider reports error after FS leaves safe mode") {
+ testDir.delete()
+ val clock = new ManualClock()
+ val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ doReturn(true).when(provider).isFsInSafeMode()
+
+ val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
+ val initThread = provider.initialize(Some(errorHandler))
+ try {
+ // Synchronization needed because of mockito.
+ clock.synchronized {
+ doReturn(false).when(provider).isFsInSafeMode()
+ clock.setTime(10000)
+ }
+
+ eventually(timeout(1 second), interval(10 millis)) {
+ verify(errorHandler).uncaughtException(any(), any())
+ }
+ } finally {
+ provider.stop()
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example: