aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala8
1 files changed, 7 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9825f19b86..b3a0d6ad0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -116,7 +116,7 @@ class StreamExecution(
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
*/
val microBatchThread =
- new UninterruptibleThread(s"stream execution thread for $name") {
+ new StreamExecutionThread(s"stream execution thread for $name") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
@@ -530,3 +530,9 @@ object StreamExecution {
def nextId: Long = _nextId.getAndIncrement()
}
+
+/**
+ * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread
+ * and will use `classOf[StreamExecutionThread]` to check.
+ */
+abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name)