diff options
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 4bd6431cbe..6e77f354b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -321,6 +321,7 @@ class StreamExecution( initializationLatch.countDown() try { + stopSources() state.set(TERMINATED) currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) @@ -558,6 +559,18 @@ class StreamExecution( sparkSession.streams.postListenerEvent(event) } + /** Stops all streaming sources safely. */ + private def stopSources(): Unit = { + uniqueSources.foreach { source => + try { + source.stop() + } catch { + case NonFatal(e) => + logWarning(s"Failed to stop streaming source: $source. Resources may have leaked.", e) + } + } + } + /** * Signals to the thread executing micro-batches that it should stop running after the next * batch. This method blocks until the thread stops running. @@ -570,7 +583,6 @@ class StreamExecution( microBatchThread.interrupt() microBatchThread.join() } - uniqueSources.foreach(_.stop()) logInfo(s"Query $prettyIdString was stopped") } |