aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-27 11:50:18 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-11-27 11:50:18 -0800
commitf57e6c9effdb9e282fc8ae66dc30fe053fed5272 (patch)
tree0e4b5bebf7f1722cc7ae5c0d53eb9063878af8e4 /streaming/src/test
parentba02f6cb5a40511cefa511d410be93c035d43f23 (diff)
downloadspark-f57e6c9effdb9e282fc8ae66dc30fe053fed5272.tar.gz
spark-f57e6c9effdb9e282fc8ae66dc30fe053fed5272.tar.bz2
spark-f57e6c9effdb9e282fc8ae66dc30fe053fed5272.zip
[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite
In StreamingListenerSuite."don't call ssc.stop in listener", after the main thread calls `ssc.stop()`, `StreamingContextStoppingCollector` may call `ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated `StreamingContextStoppingCollector` to only call `ssc.stop()` in the first batch to avoid the dead-lock. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10011 from zsxwing/fix-test-deadlock.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575ab25..04cd5bdc26 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
- batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
+ if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)) {
+ fail("The first batch cannot complete in 10 seconds")
+ }
+ // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call
+ // `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
_ssc.stop()
assert(contextStoppingCollector.sparkExSeen)
}
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
@volatile var sparkExSeen = false
+
+ private var isFirstBatch = true
+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- try {
- ssc.stop()
- } catch {
- case se: SparkException =>
- sparkExSeen = true
+ if (isFirstBatch) {
+ // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main
+ // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling
+ // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+ isFirstBatch = false
+ try {
+ ssc.stop()
+ } catch {
+ case se: SparkException =>
+ sparkExSeen = true
+ }
}
}
}