aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala104
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala65
2 files changed, 166 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 80bfda9ddd..24aa386c72 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val NOT_STARTED = "<Not Started>"
+ // Interval between safemode checks.
+ private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
+ "spark.history.fs.safemodeCheck.interval", "5s")
+
// Interval between each check for event log updates
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
@@ -107,9 +112,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- initialize()
+ // Conf option used for testing the initialization code.
+ val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) {
+ initialize(None)
+ } else {
+ null
+ }
+
+ private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
+ if (!isFsInSafeMode()) {
+ startPolling()
+ return null
+ }
+
+ // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
+ // for the FS to leave safe mode before enabling polling. This allows the main history server
+ // UI to be shown (so that the user can see the HDFS status).
+ //
+ // The synchronization in the run() method is needed because of the tests; mockito can
+ // misbehave if the test is modifying the mocked methods while the thread is calling
+ // them.
+ val initThread = new Thread(new Runnable() {
+ override def run(): Unit = {
+ try {
+ clock.synchronized {
+ while (isFsInSafeMode()) {
+ logInfo("HDFS is still in safe mode. Waiting...")
+ val deadline = clock.getTimeMillis() +
+ TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
+ clock.waitTillTime(deadline)
+ }
+ }
+ startPolling()
+ } catch {
+ case _: InterruptedException =>
+ }
+ }
+ })
+ initThread.setDaemon(true)
+ initThread.setName(s"${getClass().getSimpleName()}-init")
+ initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
+ new Thread.UncaughtExceptionHandler() {
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ logError("Error initializing FsHistoryProvider.", e)
+ System.exit(1)
+ }
+ }))
+ initThread.start()
+ initThread
+ }
- private def initialize(): Unit = {
+ private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
@@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)
+ override def getConfig(): Map[String, String] = {
+ val safeMode = if (isFsInSafeMode()) {
+ Map("HDFS State" -> "In safe mode, application logs not available.")
+ } else {
+ Map()
+ }
+ Map("Event log directory" -> logDir.toString) ++ safeMode
+ }
+
+ override def stop(): Unit = {
+ if (initThread != null && initThread.isAlive()) {
+ initThread.interrupt()
+ initThread.join()
+ }
+ }
/**
* Builds the application list based on the current contents of the log directory.
@@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
+ /**
+ * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
+ * so we have to resort to ugly reflection (as usual...).
+ *
+ * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
+ * makes it more public than not.
+ */
+ private[history] def isFsInSafeMode(): Boolean = fs match {
+ case dfs: DistributedFileSystem =>
+ isFsInSafeMode(dfs)
+ case _ =>
+ false
+ }
+
+ // For testing.
+ private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
+ val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"
+ val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
+ val actionClass: Class[_] =
+ try {
+ getClass().getClassLoader().loadClass(hadoop2Class)
+ } catch {
+ case _: ClassNotFoundException =>
+ getClass().getClassLoader().loadClass(hadoop1Class)
+ }
+
+ val action = actionClass.getField("SAFEMODE_GET").get(null)
+ val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
+ method.invoke(dfs, action).asInstanceOf[Boolean]
+ }
+
}
private[history] object FsHistoryProvider {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 73cff89544..833aab14ca 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -24,13 +24,19 @@ import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doReturn, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.io._
@@ -407,6 +413,65 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("provider correctly checks whether fs is in safe mode") {
+ val provider = spy(new FsHistoryProvider(createTestConf()))
+ val dfs = mock(classOf[DistributedFileSystem])
+ // Asserts that safe mode is false because we can't really control the return value of the mock,
+ // since the API is different between hadoop 1 and 2.
+ assert(!provider.isFsInSafeMode(dfs))
+ }
+
+ test("provider waits for safe mode to finish before initializing") {
+ val clock = new ManualClock()
+ val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ doReturn(true).when(provider).isFsInSafeMode()
+
+ val initThread = provider.initialize(None)
+ try {
+ provider.getConfig().keys should contain ("HDFS State")
+
+ clock.setTime(5000)
+ provider.getConfig().keys should contain ("HDFS State")
+
+ // Synchronization needed because of mockito.
+ clock.synchronized {
+ doReturn(false).when(provider).isFsInSafeMode()
+ clock.setTime(10000)
+ }
+
+ eventually(timeout(1 second), interval(10 millis)) {
+ provider.getConfig().keys should not contain ("HDFS State")
+ }
+ } finally {
+ provider.stop()
+ }
+ }
+
+ test("provider reports error after FS leaves safe mode") {
+ testDir.delete()
+ val clock = new ManualClock()
+ val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ doReturn(true).when(provider).isFsInSafeMode()
+
+ val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
+ val initThread = provider.initialize(Some(errorHandler))
+ try {
+ // Synchronization needed because of mockito.
+ clock.synchronized {
+ doReturn(false).when(provider).isFsInSafeMode()
+ clock.setTime(10000)
+ }
+
+ eventually(timeout(1 second), interval(10 millis)) {
+ verify(errorHandler).uncaughtException(any(), any())
+ }
+ } finally {
+ provider.stop()
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example: