aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-06-08 22:47:29 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-08 22:47:29 -0700
commitafbe35cf5b272991b4986e551b42d9201c3862c3 (patch)
tree5a6afa114c3ea5c37b932e41f30b0dcd55f31380
parent4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36 (diff)
downloadspark-afbe35cf5b272991b4986e551b42d9201c3862c3.tar.gz
spark-afbe35cf5b272991b4986e551b42d9201c3862c3.tar.bz2
spark-afbe35cf5b272991b4986e551b42d9201c3862c3.zip
[SPARK-14670] [SQL] allow updating driver side sql metrics
## What changes were proposed in this pull request? On the SparkUI right now we have this SQLTab that displays accumulator values per operator. However, it only displays metrics updated on the executors, not on the driver. It is useful to also include driver metrics, e.g. broadcast time. This is a different version from https://github.com/apache/spark/pull/12427. This PR sends driver side accumulator updates right after the updating happens, not at the end of execution, by a new event. ## How was this patch tested? new test in `SQLListenerSuite` ![qq20160606-0](https://cloud.githubusercontent.com/assets/3182036/15841418/0eb137da-2c06-11e6-9068-5694eeb78530.png) Author: Wenchen Fan <wenchen@databricks.com> Closes #13189 from cloud-fan/metrics.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala56
3 files changed, 85 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index d3081ba7ac..bd0841db7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.util.ThreadUtils
/**
@@ -92,6 +93,14 @@ case class BroadcastExchangeExec(
val broadcasted = sparkContext.broadcast(relation)
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+
+ // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
+ // directly without setting an execution id. We should be tolerant to it.
+ if (executionId != null) {
+ sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
+ executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
+ }
+
broadcasted
}
}(BroadcastExchangeExec.executionContext)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 03b532664a..6e94791901 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -42,6 +42,10 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
+@DeveloperApi
+case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
+ extends SparkListenerEvent
+
private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
@@ -251,6 +255,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
}
}
+ case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => synchronized {
+ _executionIdToData.get(executionId).foreach { executionUIData =>
+ for ((accId, accValue) <- accumUpdates) {
+ executionUIData.driverAccumUpdates(accId) = accValue
+ }
+ }
+ }
case _ => // Ignore
}
@@ -296,7 +307,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
(accumulatorUpdate._1, accumulatorUpdate._2)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
- mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
+
+ val driverUpdates = executionUIData.driverAccumUpdates.toSeq
+ mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped
@@ -368,10 +381,15 @@ private[ui] class SQLExecutionUIData(
val physicalPlanDescription: String,
val physicalPlanGraph: SparkPlanGraph,
val accumulatorMetrics: Map[Long, SQLPlanMetric],
- val submissionTime: Long,
- var completionTime: Option[Long] = None,
- val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty,
- val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) {
+ val submissionTime: Long) {
+
+ var completionTime: Option[Long] = None
+
+ val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty
+
+ val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()
+
+ val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty
/**
* Return whether there are running jobs in this execution.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 6788c9d65f..6e60b0e4fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -23,11 +23,15 @@ import org.mockito.Mockito.mock
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler._
-import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
@@ -386,6 +390,52 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get))
}
+ test("driver side SQL metrics") {
+ val listener = new SQLListener(spark.sparkContext.conf)
+ val expectedAccumValue = 12345
+ val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
+ sqlContext.sparkContext.addSparkListener(listener)
+ val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
+ override lazy val sparkPlan = physicalPlan
+ override lazy val executedPlan = physicalPlan
+ }
+ SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
+ physicalPlan.execute().collect()
+ }
+
+ def waitTillExecutionFinished(): Unit = {
+ while (listener.getCompletedExecutions.isEmpty) {
+ Thread.sleep(100)
+ }
+ }
+ waitTillExecutionFinished()
+
+ val driverUpdates = listener.getCompletedExecutions.head.driverAccumUpdates
+ assert(driverUpdates.size == 1)
+ assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
+ }
+
+}
+
+
+/**
+ * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a [[SQLMetrics]]
+ * on the driver.
+ */
+private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExecNode {
+ override def sparkContext: SparkContext = sc
+ override def output: Seq[Attribute] = Seq()
+
+ override val metrics: Map[String, SQLMetric] = Map(
+ "dummy" -> SQLMetrics.createMetric(sc, "dummy"))
+
+ override def doExecute(): RDD[InternalRow] = {
+ longMetric("dummy") += expectedValue
+ sc.listenerBus.post(SparkListenerDriverAccumUpdates(
+ sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong,
+ metrics.values.map(m => m.id -> m.value).toSeq))
+ sc.emptyRDD
+ }
}