diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-11-03 16:26:28 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-11-03 16:26:28 -0800 |
commit | 53e9cee3e4e845d1f875c487215c0f22503347b1 (patch) | |
tree | 106755f5ae74394058376b7c35f805f60e6c9737 /core | |
parent | 680b4e7bca935dc1569f35fa319bdfb01a12f7e0 (diff) | |
download | spark-53e9cee3e4e845d1f875c487215c0f22503347b1.tar.gz spark-53e9cee3e4e845d1f875c487215c0f22503347b1.tar.bz2 spark-53e9cee3e4e845d1f875c487215c0f22503347b1.zip |
[SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test.
The test functionality should be the same, but without using mockito; logs don't
really say anything useful but I suspect it may be the cause of the flakiness,
since updating mocks when multiple threads may be using it doesn't work very
well. It also allows some other cleanup (= less test code in FsHistoryProvider).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #9425 from vanzin/SPARK-11466.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 31 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala | 42 |
2 files changed, 34 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 24aa386c72..718efc4f3b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -113,35 +113,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Conf option used for testing the initialization code. - val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) { - initialize(None) - } else { - null - } + val initThread = initialize() - private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { + private[history] def initialize(): Thread = { if (!isFsInSafeMode()) { startPolling() - return null + null + } else { + startSafeModeCheckThread(None) } + } + private[history] def startSafeModeCheckThread( + errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait // for the FS to leave safe mode before enabling polling. This allows the main history server // UI to be shown (so that the user can see the HDFS status). - // - // The synchronization in the run() method is needed because of the tests; mockito can - // misbehave if the test is modifying the mocked methods while the thread is calling - // them. val initThread = new Thread(new Runnable() { override def run(): Unit = { try { - clock.synchronized { - while (isFsInSafeMode()) { - logInfo("HDFS is still in safe mode. Waiting...") - val deadline = clock.getTimeMillis() + - TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) - clock.waitTillTime(deadline) - } + while (isFsInSafeMode()) { + logInfo("HDFS is still in safe mode. Waiting...") + val deadline = clock.getTimeMillis() + + TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) + clock.waitTillTime(deadline) } startPolling() } catch { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 833aab14ca..5cab17f8a3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -41,7 +41,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf, SparkFunSuite} import org.apache.spark.io._ import org.apache.spark.scheduler._ -import org.apache.spark.util.{JsonProtocol, ManualClock, Utils} +import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { @@ -423,22 +423,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("provider waits for safe mode to finish before initializing") { val clock = new ManualClock() - val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") - val provider = spy(new FsHistoryProvider(conf, clock)) - doReturn(true).when(provider).isFsInSafeMode() - - val initThread = provider.initialize(None) + val provider = new SafeModeTestProvider(createTestConf(), clock) + val initThread = provider.initialize() try { provider.getConfig().keys should contain ("HDFS State") clock.setTime(5000) provider.getConfig().keys should contain ("HDFS State") - // Synchronization needed because of mockito. - clock.synchronized { - doReturn(false).when(provider).isFsInSafeMode() - clock.setTime(10000) - } + provider.inSafeMode = false + clock.setTime(10000) eventually(timeout(1 second), interval(10 millis)) { provider.getConfig().keys should not contain ("HDFS State") @@ -451,18 +445,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("provider reports error after FS leaves safe mode") { testDir.delete() val clock = new ManualClock() - val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") - val provider = spy(new FsHistoryProvider(conf, clock)) - doReturn(true).when(provider).isFsInSafeMode() - + val provider = new SafeModeTestProvider(createTestConf(), clock) val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler]) - val initThread = provider.initialize(Some(errorHandler)) + val initThread = provider.startSafeModeCheckThread(Some(errorHandler)) try { - // Synchronization needed because of mockito. - clock.synchronized { - doReturn(false).when(provider).isFsInSafeMode() - clock.setTime(10000) - } + provider.inSafeMode = false + clock.setTime(10000) eventually(timeout(1 second), interval(10 millis)) { verify(errorHandler).uncaughtException(any(), any()) @@ -530,4 +518,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc log } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) + extends FsHistoryProvider(conf, clock) { + + @volatile var inSafeMode = true + + // Skip initialization so that we can manually start the safe mode check thread. + private[history] override def initialize(): Thread = null + + private[history] override def isFsInSafeMode(): Boolean = inSafeMode + + } + } |