diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-21 11:17:44 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-21 11:17:44 -0800 |
commit | ccfe60a8304871779ff1b31b8c2d724f59d5b2af (patch) | |
tree | 1f48dbb801c4d77881df08c200ef1f330ae0737f /streaming/src | |
parent | 607a1e63dbc9269b806a9f537e1d041029333cdd (diff) | |
download | spark-ccfe60a8304871779ff1b31b8c2d724f59d5b2af.tar.gz spark-ccfe60a8304871779ff1b31b8c2d724f59d5b2af.tar.bz2 spark-ccfe60a8304871779ff1b31b8c2d724f59d5b2af.zip |
[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality
## What changes were proposed in this pull request?
The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16321 from zsxwing/SPARK-18031.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index b49e579071..1d2bf35a6d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite private val batchDurationMillis = 1000L private var allocationClient: ExecutorAllocationClient = null - private var clock: ManualClock = null + private var clock: StreamManualClock = null before { allocationClient = mock[ExecutorAllocationClient] - clock = new ManualClock() + clock = new StreamManualClock() } test("basic functionality") { @@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite reset(allocationClient) when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2")) addBatchProcTime(allocationManager, batchProcTimeMs.toLong) - clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1) + val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1 + val expectedWaitTime = clock.getTimeMillis() + advancedTime + clock.advance(advancedTime) + // Make sure ExecutorAllocationManager.manageAllocation is called eventually(timeout(10 seconds)) { - body + assert(clock.isStreamWaitingAt(expectedWaitTime)) } + body } /** Verify that the expected number of total executor were requested */ @@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } } } + +/** + * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock + * is blocking. + */ +class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { + try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) + } finally { + waitStartTime = None + } + } + + /** + * Returns if the clock is blocking and the time it started to block is the parameter `time`. + */ + def isStreamWaitingAt(time: Long): Boolean = synchronized { + waitStartTime == Some(time) + } +} |