aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-01 14:42:18 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-01 14:42:18 -0800
commitcf04fdfe71abc395163a625cc1f99ec5e54cc07e (patch)
tree4ad787aec9107ae74935e9a30342aaf419f49366 /core/src/test
parent046e32ed8467e0f46ffeca1a95d4d40017eb5bdb (diff)
downloadspark-cf04fdfe71abc395163a625cc1f99ec5e54cc07e.tar.gz
spark-cf04fdfe71abc395163a625cc1f99ec5e54cc07e.tar.bz2
spark-cf04fdfe71abc395163a625cc1f99ec5e54cc07e.zip
[SPARK-11020][CORE] Wait for HDFS to leave safe mode before initializing HS.
Large HDFS clusters may take a while to leave safe mode when starting; this change makes the HS wait for that before doing checks about its configuraton. This means the HS won't stop right away if HDFS is in safe mode and the configuration is not correct, but that should be a very uncommon situation. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9043 from vanzin/SPARK-11020.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala65
1 files changed, 65 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 73cff89544..833aab14ca 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -24,13 +24,19 @@ import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{doReturn, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.io._
@@ -407,6 +413,65 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}
+ test("provider correctly checks whether fs is in safe mode") {
+ val provider = spy(new FsHistoryProvider(createTestConf()))
+ val dfs = mock(classOf[DistributedFileSystem])
+ // Asserts that safe mode is false because we can't really control the return value of the mock,
+ // since the API is different between hadoop 1 and 2.
+ assert(!provider.isFsInSafeMode(dfs))
+ }
+
+ test("provider waits for safe mode to finish before initializing") {
+ val clock = new ManualClock()
+ val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ doReturn(true).when(provider).isFsInSafeMode()
+
+ val initThread = provider.initialize(None)
+ try {
+ provider.getConfig().keys should contain ("HDFS State")
+
+ clock.setTime(5000)
+ provider.getConfig().keys should contain ("HDFS State")
+
+ // Synchronization needed because of mockito.
+ clock.synchronized {
+ doReturn(false).when(provider).isFsInSafeMode()
+ clock.setTime(10000)
+ }
+
+ eventually(timeout(1 second), interval(10 millis)) {
+ provider.getConfig().keys should not contain ("HDFS State")
+ }
+ } finally {
+ provider.stop()
+ }
+ }
+
+ test("provider reports error after FS leaves safe mode") {
+ testDir.delete()
+ val clock = new ManualClock()
+ val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ doReturn(true).when(provider).isFsInSafeMode()
+
+ val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
+ val initThread = provider.initialize(Some(errorHandler))
+ try {
+ // Synchronization needed because of mockito.
+ clock.synchronized {
+ doReturn(false).when(provider).isFsInSafeMode()
+ clock.setTime(10000)
+ }
+
+ eventually(timeout(1 second), interval(10 millis)) {
+ verify(errorHandler).uncaughtException(any(), any())
+ }
+ } finally {
+ provider.stop()
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example: