aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2017-03-03 10:35:15 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-03 10:35:15 -0800
commit9314c08377cc8da88f4e31d1a9d41376e96a81b3 (patch)
tree04bfe9cc2d2aa9601113a9b04a2095cf27cf5913 /sql/core/src/main
parent37a1c0e461737d4a4bbb03d397b651ec5ba00e96 (diff)
downloadspark-9314c08377cc8da88f4e31d1a9d41376e96a81b3.tar.gz
spark-9314c08377cc8da88f4e31d1a9d41376e96a81b3.tar.bz2
spark-9314c08377cc8da88f4e31d1a9d41376e96a81b3.zip
[SPARK-19774] StreamExecution should call stop() on sources when a stream fails
## What changes were proposed in this pull request? We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka. ## How was this patch tested? Unit tests in `StreamingQuerySuite`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #17107 from brkyvz/close-source.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4bd6431cbe..6e77f354b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -321,6 +321,7 @@ class StreamExecution(
initializationLatch.countDown()
try {
+ stopSources()
state.set(TERMINATED)
currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
@@ -558,6 +559,18 @@ class StreamExecution(
sparkSession.streams.postListenerEvent(event)
}
+ /** Stops all streaming sources safely. */
+ private def stopSources(): Unit = {
+ uniqueSources.foreach { source =>
+ try {
+ source.stop()
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to stop streaming source: $source. Resources may have leaked.", e)
+ }
+ }
+ }
+
/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
@@ -570,7 +583,6 @@ class StreamExecution(
microBatchThread.interrupt()
microBatchThread.join()
}
- uniqueSources.foreach(_.stop())
logInfo(s"Query $prettyIdString was stopped")
}