diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-11-27 11:50:18 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2015-11-27 11:50:18 -0800 |
commit | f57e6c9effdb9e282fc8ae66dc30fe053fed5272 (patch) | |
tree | 0e4b5bebf7f1722cc7ae5c0d53eb9063878af8e4 /streaming/src | |
parent | ba02f6cb5a40511cefa511d410be93c035d43f23 (diff) | |
download | spark-f57e6c9effdb9e282fc8ae66dc30fe053fed5272.tar.gz spark-f57e6c9effdb9e282fc8ae66dc30fe053fed5272.tar.bz2 spark-f57e6c9effdb9e282fc8ae66dc30fe053fed5272.zip |
[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite
In StreamingListenerSuite."don't call ssc.stop in listener", after the main thread calls `ssc.stop()`, `StreamingContextStoppingCollector` may call `ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated `StreamingContextStoppingCollector` to only call `ssc.stop()` in the first batch to avoid the dead-lock.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10011 from zsxwing/fix-test-deadlock.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index df4575ab25..04cd5bdc26 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val batchCounter = new BatchCounter(_ssc) _ssc.start() // Make sure running at least one batch - batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000) + if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)) { + fail("The first batch cannot complete in 10 seconds") + } + // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call + // `ssc.stop()`, so it's safe to call `_ssc.stop()` now. _ssc.stop() assert(contextStoppingCollector.sparkExSeen) } @@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener { */ class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener { @volatile var sparkExSeen = false + + private var isFirstBatch = true + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { - try { - ssc.stop() - } catch { - case se: SparkException => - sparkExSeen = true + if (isFirstBatch) { + // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main + // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling + // `ssc.stop()` in the listener thread, which becomes a dead-lock. + isFirstBatch = false + try { + ssc.stop() + } catch { + case se: SparkException => + sparkExSeen = true + } } } } |