aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-21 11:17:44 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-21 11:17:44 -0800
commitccfe60a8304871779ff1b31b8c2d724f59d5b2af (patch)
tree1f48dbb801c4d77881df08c200ef1f330ae0737f /streaming
parent607a1e63dbc9269b806a9f537e1d041029333cdd (diff)
downloadspark-ccfe60a8304871779ff1b31b8c2d724f59d5b2af.tar.gz
spark-ccfe60a8304871779ff1b31b8c2d724f59d5b2af.tar.bz2
spark-ccfe60a8304871779ff1b31b8c2d724f59d5b2af.zip
[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic functionality
## What changes were proposed in this pull request? The failure is because in `test("basic functionality")`, it doesn't block until `ExecutorAllocationManager.manageAllocation` is called. This PR just adds StreamManualClock to allow the tests to block on expected wait time to make the test deterministic. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16321 from zsxwing/SPARK-18031.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala36
1 files changed, 32 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e579071..1d2bf35a6d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
private val batchDurationMillis = 1000L
private var allocationClient: ExecutorAllocationClient = null
- private var clock: ManualClock = null
+ private var clock: StreamManualClock = null
before {
allocationClient = mock[ExecutorAllocationClient]
- clock = new ManualClock()
+ clock = new StreamManualClock()
}
test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
reset(allocationClient)
when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
- clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+ val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+ val expectedWaitTime = clock.getTimeMillis() + advancedTime
+ clock.advance(advancedTime)
+ // Make sure ExecutorAllocationManager.manageAllocation is called
eventually(timeout(10 seconds)) {
- body
+ assert(clock.isStreamWaitingAt(expectedWaitTime))
}
+ body
}
/** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
}
}
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
+ private var waitStartTime: Option[Long] = None
+
+ override def waitTillTime(targetTime: Long): Long = synchronized {
+ try {
+ waitStartTime = Some(getTimeMillis())
+ super.waitTillTime(targetTime)
+ } finally {
+ waitStartTime = None
+ }
+ }
+
+ /**
+ * Returns if the clock is blocking and the time it started to block is the parameter `time`.
+ */
+ def isStreamWaitingAt(time: Long): Boolean = synchronized {
+ waitStartTime == Some(time)
+ }
+}