aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala56
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala139
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala43
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala1
13 files changed, 129 insertions, 269 deletions
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
deleted file mode 100644
index 507100be90..0000000000
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 8d27839525..9cc65de191 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1263,8 +1263,6 @@ object SQLContext {
*/
@transient private val instantiatedContext = new AtomicReference[SQLContext]()
- @transient private val sqlListener = new AtomicReference[SQLListener]()
-
/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
*
@@ -1309,10 +1307,6 @@ object SQLContext {
Option(instantiatedContext.get())
}
- private[sql] def clearSqlListener(): Unit = {
- sqlListener.set(null)
- }
-
/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1361,13 +1355,9 @@ object SQLContext {
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
*/
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
- if (sqlListener.get() == null) {
- val listener = new SQLListener(sc.conf)
- if (sqlListener.compareAndSet(null, listener)) {
- sc.addSparkListener(listener)
- sc.ui.foreach(new SQLTab(listener, _))
- }
- }
- sqlListener.get()
+ val listener = new SQLListener(sc.conf)
+ sc.addSparkListener(listener)
+ sc.ui.foreach(new SQLTab(listener, _))
+ listener
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 3497198626..1422e15549 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,8 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
- SparkListenerSQLExecutionEnd}
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.util.Utils
private[sql] object SQLExecution {
@@ -46,14 +45,25 @@ private[sql] object SQLExecution {
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
- sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
- executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
- SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
+ sqlContext.listener.onExecutionStart(
+ executionId,
+ callSite.shortForm,
+ callSite.longForm,
+ queryExecution.toString,
+ SparkPlanGraph(queryExecution.executedPlan),
+ System.currentTimeMillis())
try {
body
} finally {
- sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
- executionId, System.currentTimeMillis()))
+ // Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
+ // However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
+ // SQL event types to SparkListener since it's a public API, we cannot guarantee that.
+ //
+ // SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
+ //
+ // The worst case is onExecutionEnd may happen before onJobStart when the listener thread
+ // is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
+ sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
}
} finally {
sc.setLocalProperty(EXECUTION_ID_KEY, null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
deleted file mode 100644
index 486ce34064..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.metric.SQLMetricInfo
-import org.apache.spark.util.Utils
-
-/**
- * :: DeveloperApi ::
- * Stores information about a SQL SparkPlan.
- */
-@DeveloperApi
-class SparkPlanInfo(
- val nodeName: String,
- val simpleString: String,
- val children: Seq[SparkPlanInfo],
- val metrics: Seq[SQLMetricInfo])
-
-private[sql] object SparkPlanInfo {
-
- def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
- val metrics = plan.metrics.toSeq.map { case (key, metric) =>
- new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
- Utils.getFormattedClassName(metric.param))
- }
- val children = plan.children.map(fromSparkPlan)
-
- new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
deleted file mode 100644
index 2708219ad3..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.metric
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Stores information about a SQL Metric.
- */
-@DeveloperApi
-class SQLMetricInfo(
- val name: String,
- val accumulatorId: Long,
- val metricParam: String)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 6c0f6f8a52..1c253e3942 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -104,39 +104,21 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
}
-private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
-
-private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
- (values: Seq[Long]) => {
- // This is a workaround for SPARK-11013.
- // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
- // it at the end of task and the value will be at least 0.
- val validValues = values.filter(_ >= 0)
- val Seq(sum, min, med, max) = {
- val metric = if (validValues.length == 0) {
- Seq.fill(4)(0L)
- } else {
- val sorted = validValues.sorted
- Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
- }
- metric.map(Utils.bytesToString)
- }
- s"\n$sum ($min, $med, $max)"
- }, -1L)
-
private[sql] object SQLMetrics {
private def createLongMetric(
sc: SparkContext,
name: String,
- param: LongSQLMetricParam): LongSQLMetric = {
+ stringValue: Seq[Long] => String,
+ initialValue: Long): LongSQLMetric = {
+ val param = new LongSQLMetricParam(stringValue, initialValue)
val acc = new LongSQLMetric(name, param)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
- createLongMetric(sc, name, LongSQLMetricParam)
+ createLongMetric(sc, name, _.sum.toString, 0L)
}
/**
@@ -144,25 +126,31 @@ private[sql] object SQLMetrics {
* spill size, etc.
*/
def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = {
+ val stringValue = (values: Seq[Long]) => {
+ // This is a workaround for SPARK-11013.
+ // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
+ // it at the end of task and the value will be at least 0.
+ val validValues = values.filter(_ >= 0)
+ val Seq(sum, min, med, max) = {
+ val metric = if (validValues.length == 0) {
+ Seq.fill(4)(0L)
+ } else {
+ val sorted = validValues.sorted
+ Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
+ }
+ metric.map(Utils.bytesToString)
+ }
+ s"\n$sum ($min, $med, $max)"
+ }
// The final result of this metric in physical operator UI may looks like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
- createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
- }
-
- def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = {
- val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
- val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
- val metricParam = metricParamName match {
- case `longSQLMetricParam` => LongSQLMetricParam
- case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
- }
- metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
+ createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
}
/**
* A metric that its value will be ignored. Use this one when we need a metric parameter but don't
* care about the value.
*/
- val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam)
+ val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index c74ad40406..e74d6fb396 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest
-import scala.xml.Node
+import scala.xml.{Node, Unparsed}
+
+import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index e19a1e3e58..5a072de400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,34 +19,11 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.SparkPlanInfo
-import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricValue, SQLMetricParam}
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
-import org.apache.spark.ui.SparkUI
-
-@DeveloperApi
-case class SparkListenerSQLExecutionStart(
- executionId: Long,
- description: String,
- details: String,
- physicalPlanDescription: String,
- sparkPlanInfo: SparkPlanInfo,
- time: Long)
- extends SparkListenerEvent
-
-@DeveloperApi
-case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
- extends SparkListenerEvent
-
-private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
-
- override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
- List(new SQLHistoryListener(conf, sparkUI))
- }
-}
private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
@@ -141,8 +118,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) {
- updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
- finishTask = false)
+ updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask = false)
}
}
@@ -164,7 +140,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
- taskEnd.taskMetrics.accumulatorUpdates(),
+ taskEnd.taskMetrics,
finishTask = true)
}
@@ -172,12 +148,15 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
* Update the accumulator values of a task with the latest metrics for this task. This is called
* every time we receive an executor heartbeat or when a task finishes.
*/
- protected def updateTaskAccumulatorValues(
+ private def updateTaskAccumulatorValues(
taskId: Long,
stageId: Int,
stageAttemptID: Int,
- accumulatorUpdates: Map[Long, Any],
+ metrics: TaskMetrics,
finishTask: Boolean): Unit = {
+ if (metrics == null) {
+ return
+ }
_stageIdToStageMetrics.get(stageId) match {
case Some(stageMetrics) =>
@@ -195,9 +174,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
case Some(taskMetrics) =>
if (finishTask) {
taskMetrics.finished = true
- taskMetrics.accumulatorUpdates = accumulatorUpdates
+ taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
} else if (!taskMetrics.finished) {
- taskMetrics.accumulatorUpdates = accumulatorUpdates
+ taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
} else {
// If a task is finished, we should not override with accumulator updates from
// heartbeat reports
@@ -206,7 +185,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
// TODO Now just set attemptId to 0. Should fix here when we can get the attempt
// id from SparkListenerExecutorMetricsUpdate
stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
- attemptId = 0, finished = finishTask, accumulatorUpdates)
+ attemptId = 0, finished = finishTask, metrics.accumulatorUpdates())
}
}
case None =>
@@ -214,40 +193,38 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
}
- override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
- case SparkListenerSQLExecutionStart(executionId, description, details,
- physicalPlanDescription, sparkPlanInfo, time) =>
- val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
- val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
- node.metrics.map(metric => metric.accumulatorId -> metric)
- }
- val executionUIData = new SQLExecutionUIData(
- executionId,
- description,
- details,
- physicalPlanDescription,
- physicalPlanGraph,
- sqlPlanMetrics.toMap,
- time)
- synchronized {
- activeExecutions(executionId) = executionUIData
- _executionIdToData(executionId) = executionUIData
- }
- case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
- _executionIdToData.get(executionId).foreach { executionUIData =>
- executionUIData.completionTime = Some(time)
- if (!executionUIData.hasRunningJobs) {
- // onExecutionEnd happens after all "onJobEnd"s
- // So we should update the execution lists.
- markExecutionFinished(executionId)
- } else {
- // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
- // Then we don't if the execution is successful, so let the last onJobEnd updates the
- // execution lists.
- }
+ def onExecutionStart(
+ executionId: Long,
+ description: String,
+ details: String,
+ physicalPlanDescription: String,
+ physicalPlanGraph: SparkPlanGraph,
+ time: Long): Unit = {
+ val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+ node.metrics.map(metric => metric.accumulatorId -> metric)
+ }
+
+ val executionUIData = new SQLExecutionUIData(executionId, description, details,
+ physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
+ synchronized {
+ activeExecutions(executionId) = executionUIData
+ _executionIdToData(executionId) = executionUIData
+ }
+ }
+
+ def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
+ _executionIdToData.get(executionId).foreach { executionUIData =>
+ executionUIData.completionTime = Some(time)
+ if (!executionUIData.hasRunningJobs) {
+ // onExecutionEnd happens after all "onJobEnd"s
+ // So we should update the execution lists.
+ markExecutionFinished(executionId)
+ } else {
+ // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
+ // Then we don't if the execution is successful, so let the last onJobEnd updates the
+ // execution lists.
}
}
- case _ => // Ignore
}
private def markExecutionFinished(executionId: Long): Unit = {
@@ -312,38 +289,6 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
-private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
- extends SQLListener(conf) {
-
- private var sqlTabAttached = false
-
- override def onExecutorMetricsUpdate(
- executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
- // Do nothing
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- updateTaskAccumulatorValues(
- taskEnd.taskInfo.taskId,
- taskEnd.stageId,
- taskEnd.stageAttemptId,
- taskEnd.taskInfo.accumulables.map { acc =>
- (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
- }.toMap,
- finishTask = true)
- }
-
- override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
- case _: SparkListenerSQLExecutionStart =>
- if (!sqlTabAttached) {
- new SQLTab(this, sparkUI)
- sqlTabAttached = true
- }
- super.onOtherEvent(event)
- case _ => super.onOtherEvent(event)
- }
-}
-
/**
* Represent all necessary data for an execution that will be used in Web UI.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 4f50b2ecdc..9c27944d42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.execution.ui
+import java.util.concurrent.atomic.AtomicInteger
+
import org.apache.spark.Logging
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
- extends SparkUITab(sparkUI, "SQL") with Logging {
+ extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
val parent = sparkUI
@@ -33,5 +35,13 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
}
private[sql] object SQLTab {
+
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
+
+ private val nextTabId = new AtomicInteger(0)
+
+ private def nextTabName: String = {
+ val nextId = nextTabId.getAndIncrement()
+ if (nextId == 0) "SQL" else s"SQL$nextId"
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 7af0ff09c5..f1fce5478a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
-import org.apache.spark.sql.execution.SparkPlanInfo
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
/**
* A graph used for storing information of an executionPlan of DataFrame.
@@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph {
/**
* Build a SparkPlanGraph from the root of a SparkPlan tree.
*/
- def apply(planInfo: SparkPlanInfo): SparkPlanGraph = {
+ def apply(plan: SparkPlan): SparkPlanGraph = {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
- buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
+ buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
new SparkPlanGraph(nodes, edges)
}
private def buildSparkPlanGraphNode(
- planInfo: SparkPlanInfo,
+ plan: SparkPlan,
nodeIdGenerator: AtomicLong,
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
- val metrics = planInfo.metrics.map { metric =>
- SQLPlanMetric(metric.name, metric.accumulatorId,
- SQLMetrics.getMetricParam(metric.metricParam))
+ val metrics = plan.metrics.toSeq.map { case (key, metric) =>
+ SQLPlanMetric(metric.name.getOrElse(key), metric.id,
+ metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
}
val node = SparkPlanGraphNode(
- nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
+ nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
nodes += node
- val childrenNodes = planInfo.children.map(
+ val childrenNodes = plan.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
for (child <- childrenNodes) {
edges += SparkPlanGraphEdge(child.id, node.id)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4f2cad19bf..82867ab496 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,7 +26,6 @@ import org.apache.xbean.asm5.Opcodes._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
@@ -83,8 +82,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
if (jobs.size == expectedNumOfJobs) {
// If we can track all jobs, check the metric values
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
- val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
- df.queryExecution.executedPlan)).nodes.filter { node =>
+ val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
expectedMetrics.contains(node.id)
}.map { node =>
val nodeMetrics = node.metrics.map { metric =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index f93d081d0c..c15aac7750 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -21,10 +21,10 @@ import java.util.Properties
import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.LongSQLMetricValue
+import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.test.SharedSQLContext
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
@@ -82,8 +82,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val executionId = 0
val df = createTestDataFrame
val accumulatorIds =
- SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
- .nodes.flatMap(_.metrics.map(_.accumulatorId))
+ SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume all accumulators are long
var accumulatorValue = 0L
val accumulatorUpdates = accumulatorIds.map { id =>
@@ -91,13 +90,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
(id, accumulatorValue)
}.toMap
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ listener.onExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
- System.currentTimeMillis()))
+ SparkPlanGraph(df.queryExecution.executedPlan),
+ System.currentTimeMillis())
val executionUIData = listener.executionIdToData(0)
@@ -207,8 +206,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
- executionId, System.currentTimeMillis()))
+ listener.onExecutionEnd(executionId, System.currentTimeMillis())
assert(executionUIData.runningJobs.isEmpty)
assert(executionUIData.succeededJobs === Seq(0))
@@ -221,20 +219,19 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ listener.onExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
- System.currentTimeMillis()))
+ SparkPlanGraph(df.queryExecution.executedPlan),
+ System.currentTimeMillis())
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
- executionId, System.currentTimeMillis()))
+ listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
@@ -251,13 +248,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ listener.onExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
- System.currentTimeMillis()))
+ SparkPlanGraph(df.queryExecution.executedPlan),
+ System.currentTimeMillis())
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
@@ -274,8 +271,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
- executionId, System.currentTimeMillis()))
+ listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onJobEnd(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
@@ -292,20 +288,19 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ listener.onExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
- System.currentTimeMillis()))
+ SparkPlanGraph(df.queryExecution.executedPlan),
+ System.currentTimeMillis())
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
- executionId, System.currentTimeMillis()))
+ listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e7b3765487..963d10eed6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -42,7 +42,6 @@ trait SharedSQLContext extends SQLTestUtils {
* Initialize the [[TestSQLContext]].
*/
protected override def beforeAll(): Unit = {
- SQLContext.clearSqlListener()
if (_ctx == null) {
_ctx = new TestSQLContext
}