aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorTyson Condie <tcondie@gmail.com>2017-03-09 23:02:13 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-09 23:02:13 -0800
commit501b7111997bc74754663348967104181b43319b (patch)
tree63558b1db684966782c679b63a5111820a1f3a03 /sql/core/src
parent5949e6c4477fd3cb07a6962dbee48b4416ea65dd (diff)
downloadspark-501b7111997bc74754663348967104181b43319b.tar.gz
spark-501b7111997bc74754663348967104181b43319b.tar.bz2
spark-501b7111997bc74754663348967104181b43319b.zip
[SPARK-19891][SS] Await Batch Lock notified on stream execution exit
## What changes were proposed in this pull request? We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown. ## How was this patch tested? Current tests that throw exceptions at runtime will finish faster as a result of this update. zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #17231 from tcondie/kafka-writer.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 70912d13ae..529263805c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -361,6 +361,13 @@ class StreamExecution(
}
}
} finally {
+ awaitBatchLock.lock()
+ try {
+ // Wake up any threads that are waiting for the stream to progress.
+ awaitBatchLockCondition.signalAll()
+ } finally {
+ awaitBatchLock.unlock()
+ }
terminationLatch.countDown()
}
}