From ccfe60a8304871779ff1b31b8c2d724f59d5b2af Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 21 Dec 2016 11:17:44 -0800 Subject: [SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality ## What changes were proposed in this pull request? The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16321 from zsxwing/SPARK-18031. --- .../scheduler/ExecutorAllocationManagerSuite.scala | 36 +++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index b49e579071..1d2bf35a6d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite private val batchDurationMillis = 1000L private var allocationClient: ExecutorAllocationClient = null - private var clock: ManualClock = null + private var clock: StreamManualClock = null before { allocationClient = mock[ExecutorAllocationClient] - clock = new ManualClock() + clock = new StreamManualClock() } test("basic functionality") { @@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite reset(allocationClient) when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2")) addBatchProcTime(allocationManager, batchProcTimeMs.toLong) - clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1) + val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1 + val expectedWaitTime = clock.getTimeMillis() + advancedTime + clock.advance(advancedTime) + // Make sure ExecutorAllocationManager.manageAllocation is called eventually(timeout(10 seconds)) { - body + assert(clock.isStreamWaitingAt(expectedWaitTime)) } + body } /** Verify that the expected number of total executor were requested */ @@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } } } + +/** + * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock + * is blocking. + */ +class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { + try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) + } finally { + waitStartTime = None + } + } + + /** + * Returns if the clock is blocking and the time it started to block is the parameter `time`. + */ + def isStreamWaitingAt(time: Long): Boolean = synchronized { + waitStartTime == Some(time) + } +} -- cgit v1.2.3