diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-05-03 10:10:25 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-05-03 10:10:25 -0700 |
commit | 5bd9a2f697dac44a4777e24321a2eb4a3d54e24b (patch) | |
tree | b948532281b9b04d298ee0e2cf104d6df3e9794a /sql | |
parent | 5503e453ba00676925531f91f66c0108ac6b1fca (diff) | |
download | spark-5bd9a2f697dac44a4777e24321a2eb4a3d54e24b.tar.gz spark-5bd9a2f697dac44a4777e24321a2eb4a3d54e24b.tar.bz2 spark-5bd9a2f697dac44a4777e24321a2eb4a3d54e24b.zip |
[SPARK-14884][SQL][STREAMING][WEBUI] Fix call site for continuous queries
## What changes were proposed in this pull request?
Since we've been processing continuous queries in separate threads, the call sites are then `run at <unknown>:0`. It's not wrong but provides very little information; in addition, we can not distinguish two queries only from their call sites.
This patch fixes this.
### Before
[Jobs Tab]
![s1a](https://cloud.githubusercontent.com/assets/15843379/14766101/a47246b2-0a30-11e6-8d81-06a9a600113b.png)
[SQL Tab]
![s1b](https://cloud.githubusercontent.com/assets/15843379/14766102/a4750226-0a30-11e6-9ada-773d977d902b.png)
### After
[Jobs Tab]
![s2a](https://cloud.githubusercontent.com/assets/15843379/14766104/a89705b6-0a30-11e6-9830-0d40ec68527b.png)
[SQL Tab]
![s2b](https://cloud.githubusercontent.com/assets/15843379/14766103/a8966728-0a30-11e6-8e4d-c2e326400478.png)
## How was this patch tested?
Manually checks - see screenshots above.
Author: Liwei Lin <lwlin7@gmail.com>
Closes #12650 from lw-lin/fix-call-site.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala | 7 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 12 |
2 files changed, 15 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 397d66b311..31c9f1aef2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -23,7 +23,6 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} -import org.apache.spark.util.Utils private[sql] object SQLExecution { @@ -46,7 +45,11 @@ private[sql] object SQLExecution { val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) val r = try { - val callSite = Utils.getCallSite() + // sparkContext.getCallSite() would first try to pick up any call site that was previously + // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on + // continuous queries would give us call site like "run at <unknown>:0" + val callSite = sparkSession.sparkContext.getCallSite() + sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3108346913..3c5ced2af7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.sql.util.ContinuousQueryListener._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{UninterruptibleThread, Utils} /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. @@ -101,10 +101,18 @@ class StreamExecution( @volatile private[sql] var streamDeathCause: ContinuousQueryException = null + /* Get the call site in the caller thread; will pass this into the micro batch thread */ + private val callSite = Utils.getCallSite() + /** The thread that runs the micro-batches of this stream. */ private[sql] val microBatchThread = new UninterruptibleThread(s"stream execution thread for $name") { - override def run(): Unit = { runBatches() } + override def run(): Unit = { + // To fix call site like "run at <unknown>:0", we bridge the call site from the caller + // thread to this micro batch thread + sparkSession.sparkContext.setCallSite(callSite) + runBatches() + } } /** |