aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala12
2 files changed, 15 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 397d66b311..31c9f1aef2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -23,7 +23,6 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
SparkListenerSQLExecutionStart}
-import org.apache.spark.util.Utils
private[sql] object SQLExecution {
@@ -46,7 +45,11 @@ private[sql] object SQLExecution {
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
- val callSite = Utils.getCallSite()
+ // sparkContext.getCallSite() would first try to pick up any call site that was previously
+ // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
+ // continuous queries would give us call site like "run at <unknown>:0"
+ val callSite = sparkSession.sparkContext.getCallSite()
+
sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3108346913..3c5ced2af7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{UninterruptibleThread, Utils}
/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
@@ -101,10 +101,18 @@ class StreamExecution(
@volatile
private[sql] var streamDeathCause: ContinuousQueryException = null
+ /* Get the call site in the caller thread; will pass this into the micro batch thread */
+ private val callSite = Utils.getCallSite()
+
/** The thread that runs the micro-batches of this stream. */
private[sql] val microBatchThread =
new UninterruptibleThread(s"stream execution thread for $name") {
- override def run(): Unit = { runBatches() }
+ override def run(): Unit = {
+ // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
+ // thread to this micro batch thread
+ sparkSession.sparkContext.setCallSite(callSite)
+ runBatches()
+ }
}
/**