diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-11-30 13:41:52 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-11-30 13:42:35 -0800 |
commit | 2c5dee0fb8e4d1734ea3a0f22e0b5bfd2f6dba46 (patch) | |
tree | 656ca8ee9be599c811cabbd9a920cb2dd88df7d6 /sql | |
parent | f2fbfa444f6e8d27953ec2d1c0b3abd603c963f9 (diff) | |
download | spark-2c5dee0fb8e4d1734ea3a0f22e0b5bfd2f6dba46.tar.gz spark-2c5dee0fb8e4d1734ea3a0f22e0b5bfd2f6dba46.tar.bz2 spark-2c5dee0fb8e4d1734ea3a0f22e0b5bfd2f6dba46.zip |
Revert "[SPARK-11206] Support SQL UI on the history server"
This reverts commit cc243a079b1c039d6e7f0b410d1654d94a090e14 / PR #9297
I'm reverting this because it broke SQLListenerMemoryLeakSuite in the master Maven builds.
See #9991 for a discussion of why this broke the tests.
Diffstat (limited to 'sql')
13 files changed, 129 insertions, 269 deletions
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory deleted file mode 100644 index 507100be90..0000000000 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 8d27839525..9cc65de191 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1263,8 +1263,6 @@ object SQLContext { */ @transient private val instantiatedContext = new AtomicReference[SQLContext]() - @transient private val sqlListener = new AtomicReference[SQLListener]() - /** * Get the singleton SQLContext if it exists or create a new one using the given SparkContext. * @@ -1309,10 +1307,6 @@ object SQLContext { Option(instantiatedContext.get()) } - private[sql] def clearSqlListener(): Unit = { - sqlListener.set(null) - } - /** * Changes the SQLContext that will be returned in this thread and its children when * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives @@ -1361,13 +1355,9 @@ object SQLContext { * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI. */ private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = { - if (sqlListener.get() == null) { - val listener = new SQLListener(sc.conf) - if (sqlListener.compareAndSet(null, listener)) { - sc.addSparkListener(listener) - sc.ui.foreach(new SQLTab(listener, _)) - } - } - sqlListener.get() + val listener = new SQLListener(sc.conf) + sc.addSparkListener(listener) + sc.ui.foreach(new SQLTab(listener, _)) + listener } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 3497198626..1422e15549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -21,8 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart, - SparkListenerSQLExecutionEnd} +import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.util.Utils private[sql] object SQLExecution { @@ -46,14 +45,25 @@ private[sql] object SQLExecution { sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) val r = try { val callSite = Utils.getCallSite() - sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( - executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) + sqlContext.listener.onExecutionStart( + executionId, + callSite.shortForm, + callSite.longForm, + queryExecution.toString, + SparkPlanGraph(queryExecution.executedPlan), + System.currentTimeMillis()) try { body } finally { - sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) + // Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd. + // However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new + // SQL event types to SparkListener since it's a public API, we cannot guarantee that. + // + // SQLListener should handle the case that onExecutionEnd happens before onJobEnd. + // + // The worst case is onExecutionEnd may happen before onJobStart when the listener thread + // is very busy. If so, we cannot track the jobs for the execution. It seems acceptable. + sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis()) } } finally { sc.setLocalProperty(EXECUTION_ID_KEY, null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala deleted file mode 100644 index 486ce34064..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.metric.SQLMetricInfo -import org.apache.spark.util.Utils - -/** - * :: DeveloperApi :: - * Stores information about a SQL SparkPlan. - */ -@DeveloperApi -class SparkPlanInfo( - val nodeName: String, - val simpleString: String, - val children: Seq[SparkPlanInfo], - val metrics: Seq[SQLMetricInfo]) - -private[sql] object SparkPlanInfo { - - def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { - val metrics = plan.metrics.toSeq.map { case (key, metric) => - new SQLMetricInfo(metric.name.getOrElse(key), metric.id, - Utils.getFormattedClassName(metric.param)) - } - val children = plan.children.map(fromSparkPlan) - - new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala deleted file mode 100644 index 2708219ad3..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.metric - -import org.apache.spark.annotation.DeveloperApi - -/** - * :: DeveloperApi :: - * Stores information about a SQL Metric. - */ -@DeveloperApi -class SQLMetricInfo( - val name: String, - val accumulatorId: Long, - val metricParam: String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 6c0f6f8a52..1c253e3942 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -104,39 +104,21 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue) } -private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L) - -private object StaticsLongSQLMetricParam extends LongSQLMetricParam( - (values: Seq[Long]) => { - // This is a workaround for SPARK-11013. - // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update - // it at the end of task and the value will be at least 0. - val validValues = values.filter(_ >= 0) - val Seq(sum, min, med, max) = { - val metric = if (validValues.length == 0) { - Seq.fill(4)(0L) - } else { - val sorted = validValues.sorted - Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1)) - } - metric.map(Utils.bytesToString) - } - s"\n$sum ($min, $med, $max)" - }, -1L) - private[sql] object SQLMetrics { private def createLongMetric( sc: SparkContext, name: String, - param: LongSQLMetricParam): LongSQLMetric = { + stringValue: Seq[Long] => String, + initialValue: Long): LongSQLMetric = { + val param = new LongSQLMetricParam(stringValue, initialValue) val acc = new LongSQLMetric(name, param) sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = { - createLongMetric(sc, name, LongSQLMetricParam) + createLongMetric(sc, name, _.sum.toString, 0L) } /** @@ -144,25 +126,31 @@ private[sql] object SQLMetrics { * spill size, etc. */ def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = { + val stringValue = (values: Seq[Long]) => { + // This is a workaround for SPARK-11013. + // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update + // it at the end of task and the value will be at least 0. + val validValues = values.filter(_ >= 0) + val Seq(sum, min, med, max) = { + val metric = if (validValues.length == 0) { + Seq.fill(4)(0L) + } else { + val sorted = validValues.sorted + Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1)) + } + metric.map(Utils.bytesToString) + } + s"\n$sum ($min, $med, $max)" + } // The final result of this metric in physical operator UI may looks like: // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) - createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam) - } - - def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = { - val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam) - val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam) - val metricParam = metricParamName match { - case `longSQLMetricParam` => LongSQLMetricParam - case `staticsSQLMetricParam` => StaticsLongSQLMetricParam - } - metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]] + createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L) } /** * A metric that its value will be ignored. Use this one when we need a metric parameter but don't * care about the value. */ - val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam) + val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index c74ad40406..e74d6fb396 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.ui import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.xml.{Node, Unparsed} + +import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.Logging import org.apache.spark.ui.{UIUtils, WebUIPage} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index e19a1e3e58..5a072de400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -19,34 +19,11 @@ package org.apache.spark.sql.execution.ui import scala.collection.mutable -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricValue, SQLMetricParam} +import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} import org.apache.spark.{JobExecutionStatus, Logging, SparkConf} -import org.apache.spark.ui.SparkUI - -@DeveloperApi -case class SparkListenerSQLExecutionStart( - executionId: Long, - description: String, - details: String, - physicalPlanDescription: String, - sparkPlanInfo: SparkPlanInfo, - time: Long) - extends SparkListenerEvent - -@DeveloperApi -case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) - extends SparkListenerEvent - -private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { - - override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { - List(new SQLHistoryListener(conf, sparkUI)) - } -} private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging { @@ -141,8 +118,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi override def onExecutorMetricsUpdate( executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) { - updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(), - finishTask = false) + updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask = false) } } @@ -164,7 +140,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi taskEnd.taskInfo.taskId, taskEnd.stageId, taskEnd.stageAttemptId, - taskEnd.taskMetrics.accumulatorUpdates(), + taskEnd.taskMetrics, finishTask = true) } @@ -172,12 +148,15 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi * Update the accumulator values of a task with the latest metrics for this task. This is called * every time we receive an executor heartbeat or when a task finishes. */ - protected def updateTaskAccumulatorValues( + private def updateTaskAccumulatorValues( taskId: Long, stageId: Int, stageAttemptID: Int, - accumulatorUpdates: Map[Long, Any], + metrics: TaskMetrics, finishTask: Boolean): Unit = { + if (metrics == null) { + return + } _stageIdToStageMetrics.get(stageId) match { case Some(stageMetrics) => @@ -195,9 +174,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi case Some(taskMetrics) => if (finishTask) { taskMetrics.finished = true - taskMetrics.accumulatorUpdates = accumulatorUpdates + taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates() } else if (!taskMetrics.finished) { - taskMetrics.accumulatorUpdates = accumulatorUpdates + taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates() } else { // If a task is finished, we should not override with accumulator updates from // heartbeat reports @@ -206,7 +185,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi // TODO Now just set attemptId to 0. Should fix here when we can get the attempt // id from SparkListenerExecutorMetricsUpdate stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics( - attemptId = 0, finished = finishTask, accumulatorUpdates) + attemptId = 0, finished = finishTask, metrics.accumulatorUpdates()) } } case None => @@ -214,40 +193,38 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } } - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) => - val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node => - node.metrics.map(metric => metric.accumulatorId -> metric) - } - val executionUIData = new SQLExecutionUIData( - executionId, - description, - details, - physicalPlanDescription, - physicalPlanGraph, - sqlPlanMetrics.toMap, - time) - synchronized { - activeExecutions(executionId) = executionUIData - _executionIdToData(executionId) = executionUIData - } - case SparkListenerSQLExecutionEnd(executionId, time) => synchronized { - _executionIdToData.get(executionId).foreach { executionUIData => - executionUIData.completionTime = Some(time) - if (!executionUIData.hasRunningJobs) { - // onExecutionEnd happens after all "onJobEnd"s - // So we should update the execution lists. - markExecutionFinished(executionId) - } else { - // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s. - // Then we don't if the execution is successful, so let the last onJobEnd updates the - // execution lists. - } + def onExecutionStart( + executionId: Long, + description: String, + details: String, + physicalPlanDescription: String, + physicalPlanGraph: SparkPlanGraph, + time: Long): Unit = { + val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node => + node.metrics.map(metric => metric.accumulatorId -> metric) + } + + val executionUIData = new SQLExecutionUIData(executionId, description, details, + physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time) + synchronized { + activeExecutions(executionId) = executionUIData + _executionIdToData(executionId) = executionUIData + } + } + + def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized { + _executionIdToData.get(executionId).foreach { executionUIData => + executionUIData.completionTime = Some(time) + if (!executionUIData.hasRunningJobs) { + // onExecutionEnd happens after all "onJobEnd"s + // So we should update the execution lists. + markExecutionFinished(executionId) + } else { + // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s. + // Then we don't if the execution is successful, so let the last onJobEnd updates the + // execution lists. } } - case _ => // Ignore } private def markExecutionFinished(executionId: Long): Unit = { @@ -312,38 +289,6 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } -private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) - extends SQLListener(conf) { - - private var sqlTabAttached = false - - override def onExecutorMetricsUpdate( - executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized { - // Do nothing - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskInfo.accumulables.map { acc => - (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong)) - }.toMap, - finishTask = true) - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case _: SparkListenerSQLExecutionStart => - if (!sqlTabAttached) { - new SQLTab(this, sparkUI) - sqlTabAttached = true - } - super.onOtherEvent(event) - case _ => super.onOtherEvent(event) - } -} - /** * Represent all necessary data for an execution that will be used in Web UI. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index 4f50b2ecdc..9c27944d42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.ui +import java.util.concurrent.atomic.AtomicInteger + import org.apache.spark.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI) - extends SparkUITab(sparkUI, "SQL") with Logging { + extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging { val parent = sparkUI @@ -33,5 +35,13 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI) } private[sql] object SQLTab { + private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static" + + private val nextTabId = new AtomicInteger(0) + + private def nextTabName: String = { + val nextId = nextTabId.getAndIncrement() + if (nextId == 0) "SQL" else s"SQL$nextId" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 7af0ff09c5..f1fce5478a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} /** * A graph used for storing information of an executionPlan of DataFrame. @@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph { /** * Build a SparkPlanGraph from the root of a SparkPlan tree. */ - def apply(planInfo: SparkPlanInfo): SparkPlanGraph = { + def apply(plan: SparkPlan): SparkPlanGraph = { val nodeIdGenerator = new AtomicLong(0) val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() - buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges) + buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges) new SparkPlanGraph(nodes, edges) } private def buildSparkPlanGraphNode( - planInfo: SparkPlanInfo, + plan: SparkPlan, nodeIdGenerator: AtomicLong, nodes: mutable.ArrayBuffer[SparkPlanGraphNode], edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { - val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, - SQLMetrics.getMetricParam(metric.metricParam)) + val metrics = plan.metrics.toSeq.map { case (key, metric) => + SQLPlanMetric(metric.name.getOrElse(key), metric.id, + metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]) } val node = SparkPlanGraphNode( - nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics) + nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics) nodes += node - val childrenNodes = planInfo.children.map( + val childrenNodes = plan.children.map( child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges)) for (child <- childrenNodes) { edges += SparkPlanGraphEdge(child.id, node.id) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 4f2cad19bf..82867ab496 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -26,7 +26,6 @@ import org.apache.xbean.asm5.Opcodes._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ -import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -83,8 +82,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { if (jobs.size == expectedNumOfJobs) { // If we can track all jobs, check the metric values val metricValues = sqlContext.listener.getExecutionMetrics(executionId) - val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( - df.queryExecution.executedPlan)).nodes.filter { node => + val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node => expectedMetrics.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index f93d081d0c..c15aac7750 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -21,10 +21,10 @@ import java.util.Properties import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.sql.execution.metric.LongSQLMetricValue import org.apache.spark.scheduler._ import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} -import org.apache.spark.sql.execution.metric.LongSQLMetricValue +import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.test.SharedSQLContext class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { @@ -82,8 +82,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val executionId = 0 val df = createTestDataFrame val accumulatorIds = - SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)) - .nodes.flatMap(_.metrics.map(_.accumulatorId)) + SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId)) // Assume all accumulators are long var accumulatorValue = 0L val accumulatorUpdates = accumulatorIds.map { id => @@ -91,13 +90,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { (id, accumulatorValue) }.toMap - listener.onOtherEvent(SparkListenerSQLExecutionStart( + listener.onExecutionStart( executionId, "test", "test", df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + SparkPlanGraph(df.queryExecution.executedPlan), + System.currentTimeMillis()) val executionUIData = listener.executionIdToData(0) @@ -207,8 +206,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { time = System.currentTimeMillis(), JobSucceeded )) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) + listener.onExecutionEnd(executionId, System.currentTimeMillis()) assert(executionUIData.runningJobs.isEmpty) assert(executionUIData.succeededJobs === Seq(0)) @@ -221,20 +219,19 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame - listener.onOtherEvent(SparkListenerSQLExecutionStart( + listener.onExecutionStart( executionId, "test", "test", df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + SparkPlanGraph(df.queryExecution.executedPlan), + System.currentTimeMillis()) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) + listener.onExecutionEnd(executionId, System.currentTimeMillis()) listener.onJobEnd(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), @@ -251,13 +248,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame - listener.onOtherEvent(SparkListenerSQLExecutionStart( + listener.onExecutionStart( executionId, "test", "test", df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + SparkPlanGraph(df.queryExecution.executedPlan), + System.currentTimeMillis()) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -274,8 +271,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) + listener.onExecutionEnd(executionId, System.currentTimeMillis()) listener.onJobEnd(SparkListenerJobEnd( jobId = 1, time = System.currentTimeMillis(), @@ -292,20 +288,19 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame - listener.onOtherEvent(SparkListenerSQLExecutionStart( + listener.onExecutionStart( executionId, "test", "test", df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + SparkPlanGraph(df.queryExecution.executedPlan), + System.currentTimeMillis()) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq.empty, createProperties(executionId))) - listener.onOtherEvent(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) + listener.onExecutionEnd(executionId, System.currentTimeMillis()) listener.onJobEnd(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index e7b3765487..963d10eed6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -42,7 +42,6 @@ trait SharedSQLContext extends SQLTestUtils { * Initialize the [[TestSQLContext]]. */ protected override def beforeAll(): Unit = { - SQLContext.clearSqlListener() if (_ctx == null) { _ctx = new TestSQLContext } |