aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575ab25..04cd5bdc26 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
- batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
+ if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)) {
+ fail("The first batch cannot complete in 10 seconds")
+ }
+ // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call
+ // `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
_ssc.stop()
assert(contextStoppingCollector.sparkExSeen)
}
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
@volatile var sparkExSeen = false
+
+ private var isFirstBatch = true
+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- try {
- ssc.stop()
- } catch {
- case se: SparkException =>
- sparkExSeen = true
+ if (isFirstBatch) {
+ // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main
+ // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling
+ // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+ isFirstBatch = false
+ try {
+ ssc.stop()
+ } catch {
+ case se: SparkException =>
+ sparkExSeen = true
+ }
}
}
}