aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-03 16:26:28 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-03 16:26:28 -0800
commit53e9cee3e4e845d1f875c487215c0f22503347b1 (patch)
tree106755f5ae74394058376b7c35f805f60e6c9737
parent680b4e7bca935dc1569f35fa319bdfb01a12f7e0 (diff)
downloadspark-53e9cee3e4e845d1f875c487215c0f22503347b1.tar.gz
spark-53e9cee3e4e845d1f875c487215c0f22503347b1.tar.bz2
spark-53e9cee3e4e845d1f875c487215c0f22503347b1.zip
[SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test.
The test functionality should be the same, but without using mockito; logs don't really say anything useful but I suspect it may be the cause of the flakiness, since updating mocks when multiple threads may be using it doesn't work very well. It also allows some other cleanup (= less test code in FsHistoryProvider). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9425 from vanzin/SPARK-11466.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala31
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala42
2 files changed, 34 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 24aa386c72..718efc4f3b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -113,35 +113,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
// Conf option used for testing the initialization code.
- val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) {
- initialize(None)
- } else {
- null
- }
+ val initThread = initialize()
- private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
+ private[history] def initialize(): Thread = {
if (!isFsInSafeMode()) {
startPolling()
- return null
+ null
+ } else {
+ startSafeModeCheckThread(None)
}
+ }
+ private[history] def startSafeModeCheckThread(
+ errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
// for the FS to leave safe mode before enabling polling. This allows the main history server
// UI to be shown (so that the user can see the HDFS status).
- //
- // The synchronization in the run() method is needed because of the tests; mockito can
- // misbehave if the test is modifying the mocked methods while the thread is calling
- // them.
val initThread = new Thread(new Runnable() {
override def run(): Unit = {
try {
- clock.synchronized {
- while (isFsInSafeMode()) {
- logInfo("HDFS is still in safe mode. Waiting...")
- val deadline = clock.getTimeMillis() +
- TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
- clock.waitTillTime(deadline)
- }
+ while (isFsInSafeMode()) {
+ logInfo("HDFS is still in safe mode. Waiting...")
+ val deadline = clock.getTimeMillis() +
+ TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
+ clock.waitTillTime(deadline)
}
startPolling()
} catch {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 833aab14ca..5cab17f8a3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -41,7 +41,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.io._
import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
+import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
@@ -423,22 +423,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("provider waits for safe mode to finish before initializing") {
val clock = new ManualClock()
- val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
- val provider = spy(new FsHistoryProvider(conf, clock))
- doReturn(true).when(provider).isFsInSafeMode()
-
- val initThread = provider.initialize(None)
+ val provider = new SafeModeTestProvider(createTestConf(), clock)
+ val initThread = provider.initialize()
try {
provider.getConfig().keys should contain ("HDFS State")
clock.setTime(5000)
provider.getConfig().keys should contain ("HDFS State")
- // Synchronization needed because of mockito.
- clock.synchronized {
- doReturn(false).when(provider).isFsInSafeMode()
- clock.setTime(10000)
- }
+ provider.inSafeMode = false
+ clock.setTime(10000)
eventually(timeout(1 second), interval(10 millis)) {
provider.getConfig().keys should not contain ("HDFS State")
@@ -451,18 +445,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("provider reports error after FS leaves safe mode") {
testDir.delete()
val clock = new ManualClock()
- val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
- val provider = spy(new FsHistoryProvider(conf, clock))
- doReturn(true).when(provider).isFsInSafeMode()
-
+ val provider = new SafeModeTestProvider(createTestConf(), clock)
val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
- val initThread = provider.initialize(Some(errorHandler))
+ val initThread = provider.startSafeModeCheckThread(Some(errorHandler))
try {
- // Synchronization needed because of mockito.
- clock.synchronized {
- doReturn(false).when(provider).isFsInSafeMode()
- clock.setTime(10000)
- }
+ provider.inSafeMode = false
+ clock.setTime(10000)
eventually(timeout(1 second), interval(10 millis)) {
verify(errorHandler).uncaughtException(any(), any())
@@ -530,4 +518,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
log
}
+ private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
+ extends FsHistoryProvider(conf, clock) {
+
+ @volatile var inSafeMode = true
+
+ // Skip initialization so that we can manually start the safe mode check thread.
+ private[history] override def initialize(): Thread = null
+
+ private[history] override def isFsInSafeMode(): Boolean = inSafeMode
+
+ }
+
}