aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4bd6431cbe..6e77f354b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -321,6 +321,7 @@ class StreamExecution(
initializationLatch.countDown()
try {
+ stopSources()
state.set(TERMINATED)
currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
@@ -558,6 +559,18 @@ class StreamExecution(
sparkSession.streams.postListenerEvent(event)
}
+ /** Stops all streaming sources safely. */
+ private def stopSources(): Unit = {
+ uniqueSources.foreach { source =>
+ try {
+ source.stop()
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to stop streaming source: $source. Resources may have leaked.", e)
+ }
+ }
+ }
+
/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
@@ -570,7 +583,6 @@ class StreamExecution(
microBatchThread.interrupt()
microBatchThread.join()
}
- uniqueSources.foreach(_.stop())
logInfo(s"Query $prettyIdString was stopped")
}