aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.rat-excludes1
-rw-r--r--core/src/main/java/org/apache/spark/JavaSparkListener.java3
-rw-r--r--core/src/main/java/org/apache/spark/SparkFirehoseListener.java4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala11
-rw-r--r--sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala56
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala139
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala43
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala1
21 files changed, 327 insertions, 135 deletions
diff --git a/.rat-excludes b/.rat-excludes
index 08fba6d351..7262c960ed 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -82,4 +82,5 @@ INDEX
gen-java.*
.*avpr
org.apache.spark.sql.sources.DataSourceRegister
+org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index fa9acf0a15..23bc9a2e81 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener {
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
+ @Override
+ public void onOtherEvent(SparkListenerEvent event) { }
+
}
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 1214d05ba6..e6b24afd88 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener {
onEvent(blockUpdated);
}
+ @Override
+ public void onOtherEvent(SparkListenerEvent event) {
+ onEvent(event);
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 000a021a52..eaa07acc51 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 896f174333..075a7f1317 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -22,15 +22,19 @@ import java.util.Properties
import scala.collection.Map
import scala.collection.mutable
-import org.apache.spark.{Logging, TaskEndReason}
+import com.fasterxml.jackson.annotation.JsonTypeInfo
+
+import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.ui.SparkUI
@DeveloperApi
-sealed trait SparkListenerEvent
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
+trait SparkListenerEvent
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
@@ -131,6 +135,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
/**
+ * Interface for creating history listeners defined in other modules like SQL, which are used to
+ * rebuild the history UI.
+ */
+private[spark] trait SparkHistoryListenerFactory {
+ /**
+ * Create listeners used to rebuild the history UI.
+ */
+ def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
+}
+
+/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
* interface which might change in different Spark releases. Java clients should extend
@@ -223,6 +238,11 @@ trait SparkListener {
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
+
+ /**
+ * Called when other events like SQL-specific events are posted.
+ */
+ def onOtherEvent(event: SparkListenerEvent) { }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 04afde33f5..95722a0714 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
+ case _ => listener.onOtherEvent(event)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 4608bce202..8da6884a38 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,10 +17,13 @@
package org.apache.spark.ui
-import java.util.Date
+import java.util.{Date, ServiceLoader}
+
+import scala.collection.JavaConverters._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
+import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
@@ -154,7 +157,16 @@ private[spark] object SparkUI {
appName: String,
basePath: String,
startTime: Long): SparkUI = {
- create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+ val sparkUI = create(
+ None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+
+ val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
+ Utils.getContextOrSparkClassLoader).asScala
+ listenerFactories.foreach { listenerFactory =>
+ val listeners = listenerFactory.createListeners(conf, sparkUI)
+ listeners.foreach(listenerBus.addListener)
+ }
+ sparkUI
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c9beeb25e0..7f5d713ec6 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,19 +19,21 @@ package org.apache.spark.util
import java.util.{Properties, UUID}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
import scala.collection.JavaConverters._
import scala.collection.Map
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
/**
@@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
private implicit val format = DefaultFormats
+ private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
@@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
+ case _ => parse(mapper.writeValueAsString(event))
}
}
@@ -511,6 +516,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+ case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
+ .asInstanceOf[SparkListenerEvent]
}
}
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
new file mode 100644
index 0000000000..507100be90
--- /dev/null
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
@@ -0,0 +1 @@
+org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 46bf544fd8..1c2ac5f6f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1263,6 +1263,8 @@ object SQLContext {
*/
@transient private val instantiatedContext = new AtomicReference[SQLContext]()
+ @transient private val sqlListener = new AtomicReference[SQLListener]()
+
/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
*
@@ -1307,6 +1309,10 @@ object SQLContext {
Option(instantiatedContext.get())
}
+ private[sql] def clearSqlListener(): Unit = {
+ sqlListener.set(null)
+ }
+
/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1355,9 +1361,13 @@ object SQLContext {
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
*/
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
- val listener = new SQLListener(sc.conf)
- sc.addSparkListener(listener)
- sc.ui.foreach(new SQLTab(listener, _))
- listener
+ if (sqlListener.get() == null) {
+ val listener = new SQLListener(sc.conf)
+ if (sqlListener.compareAndSet(null, listener)) {
+ sc.addSparkListener(listener)
+ sc.ui.foreach(new SQLTab(listener, _))
+ }
+ }
+ sqlListener.get()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 1422e15549..3497198626 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
+ SparkListenerSQLExecutionEnd}
import org.apache.spark.util.Utils
private[sql] object SQLExecution {
@@ -45,25 +46,14 @@ private[sql] object SQLExecution {
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
- sqlContext.listener.onExecutionStart(
- executionId,
- callSite.shortForm,
- callSite.longForm,
- queryExecution.toString,
- SparkPlanGraph(queryExecution.executedPlan),
- System.currentTimeMillis())
+ sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+ executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
- // Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
- // However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
- // SQL event types to SparkListener since it's a public API, we cannot guarantee that.
- //
- // SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
- //
- // The worst case is onExecutionEnd may happen before onJobStart when the listener thread
- // is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
- sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
+ sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+ executionId, System.currentTimeMillis()))
}
} finally {
sc.setLocalProperty(EXECUTION_ID_KEY, null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
new file mode 100644
index 0000000000..486ce34064
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.execution.metric.SQLMetricInfo
+import org.apache.spark.util.Utils
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a SQL SparkPlan.
+ */
+@DeveloperApi
+class SparkPlanInfo(
+ val nodeName: String,
+ val simpleString: String,
+ val children: Seq[SparkPlanInfo],
+ val metrics: Seq[SQLMetricInfo])
+
+private[sql] object SparkPlanInfo {
+
+ def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
+ val metrics = plan.metrics.toSeq.map { case (key, metric) =>
+ new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
+ Utils.getFormattedClassName(metric.param))
+ }
+ val children = plan.children.map(fromSparkPlan)
+
+ new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
new file mode 100644
index 0000000000..2708219ad3
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a SQL Metric.
+ */
+@DeveloperApi
+class SQLMetricInfo(
+ val name: String,
+ val accumulatorId: Long,
+ val metricParam: String)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1c253e3942..6c0f6f8a52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -104,21 +104,39 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
}
+private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
+
+private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
+ (values: Seq[Long]) => {
+ // This is a workaround for SPARK-11013.
+ // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
+ // it at the end of task and the value will be at least 0.
+ val validValues = values.filter(_ >= 0)
+ val Seq(sum, min, med, max) = {
+ val metric = if (validValues.length == 0) {
+ Seq.fill(4)(0L)
+ } else {
+ val sorted = validValues.sorted
+ Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
+ }
+ metric.map(Utils.bytesToString)
+ }
+ s"\n$sum ($min, $med, $max)"
+ }, -1L)
+
private[sql] object SQLMetrics {
private def createLongMetric(
sc: SparkContext,
name: String,
- stringValue: Seq[Long] => String,
- initialValue: Long): LongSQLMetric = {
- val param = new LongSQLMetricParam(stringValue, initialValue)
+ param: LongSQLMetricParam): LongSQLMetric = {
val acc = new LongSQLMetric(name, param)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
- createLongMetric(sc, name, _.sum.toString, 0L)
+ createLongMetric(sc, name, LongSQLMetricParam)
}
/**
@@ -126,31 +144,25 @@ private[sql] object SQLMetrics {
* spill size, etc.
*/
def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = {
- val stringValue = (values: Seq[Long]) => {
- // This is a workaround for SPARK-11013.
- // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
- // it at the end of task and the value will be at least 0.
- val validValues = values.filter(_ >= 0)
- val Seq(sum, min, med, max) = {
- val metric = if (validValues.length == 0) {
- Seq.fill(4)(0L)
- } else {
- val sorted = validValues.sorted
- Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
- }
- metric.map(Utils.bytesToString)
- }
- s"\n$sum ($min, $med, $max)"
- }
// The final result of this metric in physical operator UI may looks like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
- createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
+ createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
+ }
+
+ def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = {
+ val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
+ val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
+ val metricParam = metricParamName match {
+ case `longSQLMetricParam` => LongSQLMetricParam
+ case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
+ }
+ metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
}
/**
* A metric that its value will be ignored. Use this one when we need a metric parameter but don't
* care about the value.
*/
- val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L))
+ val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index e74d6fb396..c74ad40406 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest
-import scala.xml.{Node, Unparsed}
-
-import org.apache.commons.lang3.StringEscapeUtils
+import scala.xml.Node
import org.apache.spark.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5a072de400..e19a1e3e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,11 +19,34 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricValue, SQLMetricParam}
import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
+import org.apache.spark.ui.SparkUI
+
+@DeveloperApi
+case class SparkListenerSQLExecutionStart(
+ executionId: Long,
+ description: String,
+ details: String,
+ physicalPlanDescription: String,
+ sparkPlanInfo: SparkPlanInfo,
+ time: Long)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
+ extends SparkListenerEvent
+
+private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
+
+ override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
+ List(new SQLHistoryListener(conf, sparkUI))
+ }
+}
private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
@@ -118,7 +141,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) {
- updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask = false)
+ updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
+ finishTask = false)
}
}
@@ -140,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
- taskEnd.taskMetrics,
+ taskEnd.taskMetrics.accumulatorUpdates(),
finishTask = true)
}
@@ -148,15 +172,12 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
* Update the accumulator values of a task with the latest metrics for this task. This is called
* every time we receive an executor heartbeat or when a task finishes.
*/
- private def updateTaskAccumulatorValues(
+ protected def updateTaskAccumulatorValues(
taskId: Long,
stageId: Int,
stageAttemptID: Int,
- metrics: TaskMetrics,
+ accumulatorUpdates: Map[Long, Any],
finishTask: Boolean): Unit = {
- if (metrics == null) {
- return
- }
_stageIdToStageMetrics.get(stageId) match {
case Some(stageMetrics) =>
@@ -174,9 +195,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
case Some(taskMetrics) =>
if (finishTask) {
taskMetrics.finished = true
- taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+ taskMetrics.accumulatorUpdates = accumulatorUpdates
} else if (!taskMetrics.finished) {
- taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+ taskMetrics.accumulatorUpdates = accumulatorUpdates
} else {
// If a task is finished, we should not override with accumulator updates from
// heartbeat reports
@@ -185,7 +206,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
// TODO Now just set attemptId to 0. Should fix here when we can get the attempt
// id from SparkListenerExecutorMetricsUpdate
stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
- attemptId = 0, finished = finishTask, metrics.accumulatorUpdates())
+ attemptId = 0, finished = finishTask, accumulatorUpdates)
}
}
case None =>
@@ -193,38 +214,40 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
}
- def onExecutionStart(
- executionId: Long,
- description: String,
- details: String,
- physicalPlanDescription: String,
- physicalPlanGraph: SparkPlanGraph,
- time: Long): Unit = {
- val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
- node.metrics.map(metric => metric.accumulatorId -> metric)
- }
-
- val executionUIData = new SQLExecutionUIData(executionId, description, details,
- physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
- synchronized {
- activeExecutions(executionId) = executionUIData
- _executionIdToData(executionId) = executionUIData
- }
- }
-
- def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
- _executionIdToData.get(executionId).foreach { executionUIData =>
- executionUIData.completionTime = Some(time)
- if (!executionUIData.hasRunningJobs) {
- // onExecutionEnd happens after all "onJobEnd"s
- // So we should update the execution lists.
- markExecutionFinished(executionId)
- } else {
- // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
- // Then we don't if the execution is successful, so let the last onJobEnd updates the
- // execution lists.
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case SparkListenerSQLExecutionStart(executionId, description, details,
+ physicalPlanDescription, sparkPlanInfo, time) =>
+ val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
+ val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+ node.metrics.map(metric => metric.accumulatorId -> metric)
+ }
+ val executionUIData = new SQLExecutionUIData(
+ executionId,
+ description,
+ details,
+ physicalPlanDescription,
+ physicalPlanGraph,
+ sqlPlanMetrics.toMap,
+ time)
+ synchronized {
+ activeExecutions(executionId) = executionUIData
+ _executionIdToData(executionId) = executionUIData
+ }
+ case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
+ _executionIdToData.get(executionId).foreach { executionUIData =>
+ executionUIData.completionTime = Some(time)
+ if (!executionUIData.hasRunningJobs) {
+ // onExecutionEnd happens after all "onJobEnd"s
+ // So we should update the execution lists.
+ markExecutionFinished(executionId)
+ } else {
+ // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
+ // Then we don't if the execution is successful, so let the last onJobEnd updates the
+ // execution lists.
+ }
}
}
+ case _ => // Ignore
}
private def markExecutionFinished(executionId: Long): Unit = {
@@ -289,6 +312,38 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
+private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
+ extends SQLListener(conf) {
+
+ private var sqlTabAttached = false
+
+ override def onExecutorMetricsUpdate(
+ executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
+ // Do nothing
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+ updateTaskAccumulatorValues(
+ taskEnd.taskInfo.taskId,
+ taskEnd.stageId,
+ taskEnd.stageAttemptId,
+ taskEnd.taskInfo.accumulables.map { acc =>
+ (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
+ }.toMap,
+ finishTask = true)
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case _: SparkListenerSQLExecutionStart =>
+ if (!sqlTabAttached) {
+ new SQLTab(this, sparkUI)
+ sqlTabAttached = true
+ }
+ super.onOtherEvent(event)
+ case _ => super.onOtherEvent(event)
+ }
+}
+
/**
* Represent all necessary data for an execution that will be used in Web UI.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 9c27944d42..4f50b2ecdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,13 +17,11 @@
package org.apache.spark.sql.execution.ui
-import java.util.concurrent.atomic.AtomicInteger
-
import org.apache.spark.Logging
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
- extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
+ extends SparkUITab(sparkUI, "SQL") with Logging {
val parent = sparkUI
@@ -35,13 +33,5 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
}
private[sql] object SQLTab {
-
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
-
- private val nextTabId = new AtomicInteger(0)
-
- private def nextTabName: String = {
- val nextId = nextTabId.getAndIncrement()
- if (nextId == 0) "SQL" else s"SQL$nextId"
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index f1fce5478a..7af0ff09c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* A graph used for storing information of an executionPlan of DataFrame.
@@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph {
/**
* Build a SparkPlanGraph from the root of a SparkPlan tree.
*/
- def apply(plan: SparkPlan): SparkPlanGraph = {
+ def apply(planInfo: SparkPlanInfo): SparkPlanGraph = {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
- buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
+ buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
new SparkPlanGraph(nodes, edges)
}
private def buildSparkPlanGraphNode(
- plan: SparkPlan,
+ planInfo: SparkPlanInfo,
nodeIdGenerator: AtomicLong,
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
- val metrics = plan.metrics.toSeq.map { case (key, metric) =>
- SQLPlanMetric(metric.name.getOrElse(key), metric.id,
- metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
+ val metrics = planInfo.metrics.map { metric =>
+ SQLPlanMetric(metric.name, metric.accumulatorId,
+ SQLMetrics.getMetricParam(metric.metricParam))
}
val node = SparkPlanGraphNode(
- nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
+ nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
nodes += node
- val childrenNodes = plan.children.map(
+ val childrenNodes = planInfo.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
for (child <- childrenNodes) {
edges += SparkPlanGraphEdge(child.id, node.id)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 5e2b4154dd..ebfa1eaf3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.xbean.asm5.Opcodes._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
@@ -82,7 +83,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
if (jobs.size == expectedNumOfJobs) {
// If we can track all jobs, check the metric values
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
- val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
+ val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
+ df.queryExecution.executedPlan)).nodes.filter { node =>
expectedMetrics.contains(node.id)
}.map { node =>
val nodeMetrics = node.metrics.map { metric =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index c15aac7750..f93d081d0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -21,10 +21,10 @@ import java.util.Properties
import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.sql.test.SharedSQLContext
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
@@ -82,7 +82,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val executionId = 0
val df = createTestDataFrame
val accumulatorIds =
- SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId))
+ SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
+ .nodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume all accumulators are long
var accumulatorValue = 0L
val accumulatorUpdates = accumulatorIds.map { id =>
@@ -90,13 +91,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
(id, accumulatorValue)
}.toMap
- listener.onExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanGraph(df.queryExecution.executedPlan),
- System.currentTimeMillis())
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
val executionUIData = listener.executionIdToData(0)
@@ -206,7 +207,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onExecutionEnd(executionId, System.currentTimeMillis())
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ executionId, System.currentTimeMillis()))
assert(executionUIData.runningJobs.isEmpty)
assert(executionUIData.succeededJobs === Seq(0))
@@ -219,19 +221,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
- listener.onExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanGraph(df.queryExecution.executedPlan),
- System.currentTimeMillis())
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onExecutionEnd(executionId, System.currentTimeMillis())
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ executionId, System.currentTimeMillis()))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
@@ -248,13 +251,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
- listener.onExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanGraph(df.queryExecution.executedPlan),
- System.currentTimeMillis())
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
@@ -271,7 +274,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onExecutionEnd(executionId, System.currentTimeMillis())
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ executionId, System.currentTimeMillis()))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
@@ -288,19 +292,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
- listener.onExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
- SparkPlanGraph(df.queryExecution.executedPlan),
- System.currentTimeMillis())
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- listener.onExecutionEnd(executionId, System.currentTimeMillis())
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ executionId, System.currentTimeMillis()))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 963d10eed6..e7b3765487 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -42,6 +42,7 @@ trait SharedSQLContext extends SQLTestUtils {
* Initialize the [[TestSQLContext]].
*/
protected override def beforeAll(): Unit = {
+ SQLContext.clearSqlListener()
if (_ctx == null) {
_ctx = new TestSQLContext
}