diff options
author | Tyson Condie <tcondie@gmail.com> | 2017-03-09 23:02:13 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-09 23:02:13 -0800 |
commit | 501b7111997bc74754663348967104181b43319b (patch) | |
tree | 63558b1db684966782c679b63a5111820a1f3a03 /sql/core/src | |
parent | 5949e6c4477fd3cb07a6962dbee48b4416ea65dd (diff) | |
download | spark-501b7111997bc74754663348967104181b43319b.tar.gz spark-501b7111997bc74754663348967104181b43319b.tar.bz2 spark-501b7111997bc74754663348967104181b43319b.zip |
[SPARK-19891][SS] Await Batch Lock notified on stream execution exit
## What changes were proposed in this pull request?
We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.
## How was this patch tested?
Current tests that throw exceptions at runtime will finish faster as a result of this update.
zsxwing
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tyson Condie <tcondie@gmail.com>
Closes #17231 from tcondie/kafka-writer.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 70912d13ae..529263805c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -361,6 +361,13 @@ class StreamExecution( } } } finally { + awaitBatchLock.lock() + try { + // Wake up any threads that are waiting for the stream to progress. + awaitBatchLockCondition.signalAll() + } finally { + awaitBatchLock.unlock() + } terminationLatch.countDown() } } |