diff options
author | zsxwing <zsxwing@gmail.com> | 2015-08-05 01:51:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-05 01:51:22 -0700 |
commit | 1b0317f64cfe99ff70580eeb99753cd0d31f849a (patch) | |
tree | 216ea2171955483f449c2f7b97aee1c7264d89c3 /core | |
parent | 1bf608b5ef1e6e2ae4325e13c2bd5e34db62450f (diff) | |
download | spark-1b0317f64cfe99ff70580eeb99753cd0d31f849a.tar.gz spark-1b0317f64cfe99ff70580eeb99753cd0d31f849a.tar.bz2 spark-1b0317f64cfe99ff70580eeb99753cd0d31f849a.zip |
[SPARK-8861][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan operator and add a new SQL tab
This PR includes the following changes:
### SPARK-8862: Add basic instrumentation to each SparkPlan operator
A SparkPlan can override `def accumulators: Map[String, Accumulator[_]]` to expose its metrics that can be displayed in UI. The UI will use them to track the updates and show them in the web page in real-time.
### SparkSQLExecution and SQLSparkListener
`SparkSQLExecution.withNewExecutionId` will set `spark.sql.execution.id` to the local properties so that we can use it to track all jobs that belong to the same query.
SQLSparkListener is a listener to track all accumulator updates of all tasks for a query. It receives them from heartbeats can the UI can query them in real-time.
When running a query, `SQLSparkListener.onExecutionStart` will be called. When a query is finished, `SQLSparkListener.onExecutionEnd` will be called. And the Spark jobs with the same execution id will be tracked and stored with this query.
`SQLSparkListener` has to store all accumulator updates for tasks separately. When a task fails and starts to retry, we need to drop the old accumulator updates. Because we can not revert our changes to an accumulator, we have to maintain these accumulator updates by ourselves so as to drop accumulator updates for a failed task.
### SPARK-8862: A new SQL tab
Includes two pages:
#### A page for all DataFrame/SQL queries
It will show the running, completed and failed queries in 3 tables. It also displays the jobs and their links for a query in each row.
#### A detail page for a DataFrame/SQL query
In this page, it also shows the SparkPlan metrics in real-time. Run a long-running query, such as
```
val testData = sc.parallelize((1 to 1000000).map(i => (i, i.toString))).toDF()
testData.select($"_1").filter($"_1" < 1000).foreach(_ => Thread.sleep(60))
```
and you will see the metrics keep updating in real-time.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7774)
<!-- Reviewable:end -->
Author: zsxwing <zsxwing@gmail.com>
Closes #7774 from zsxwing/sql-ui and squashes the following commits:
5a2bc99 [zsxwing] Remove UISeleniumSuite and its dependency
57d4cd2 [zsxwing] Use VisibleForTesting annotation
cc1c736 [zsxwing] Add SparkPlan.trackNumOfRowsEnabled to make subclasses easy to track the number of rows; fix the issue that the "save" action cannot collect metrics
3771ab0 [zsxwing] Register SQL metrics accmulators
3a101c0 [zsxwing] Change prepareCalled's type to AtomicBoolean for thread-safety
b8d5605 [zsxwing] Make prepare idempotent; call children's prepare in SparkPlan.prepare; change doPrepare to def
4ed11a1 [zsxwing] var -> val
332639c [zsxwing] Ignore UISeleniumSuite and SQLListenerSuite."no memory leak" because of SPARK-9580
bb52359 [zsxwing] Address other commens in SQLListener
c4d0f5d [zsxwing] Move newPredicate out of the iterator loop
957473c [zsxwing] Move STATIC_RESOURCE_DIR to object SQLTab
7ab4816 [zsxwing] Make SparkPlan accumulator API private[sql]
dae195e [zsxwing] Fix the code style and comments
3a66207 [zsxwing] Ignore irrelevant accumulators
b8484a1 [zsxwing] Merge branch 'master' into sql-ui
9406592 [zsxwing] Implement the SparkPlan viz
4ebce68 [zsxwing] Add SparkPlan.prepare to support BroadcastHashJoin to run background work in parallel
ca1811f [zsxwing] Merge branch 'master' into sql-ui
fef6fc6 [zsxwing] Fix a corner case
25f335c [zsxwing] Fix the code style
6eae828 [zsxwing] SQLSparkListener -> SQLListener; SparkSQLExecutionUIData -> SQLExecutionUIData; SparkSQLExecution -> SQLExecution
822af75 [zsxwing] Add SQLSparkListenerSuite and fix the issue about onExecutionEnd and onJobEnd
6be626f [zsxwing] Add UISeleniumSuite to test UI
d02a24d [zsxwing] Make ExecutionPage private
23abf73 [zsxwing] [SPARK-8862][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan operator and add a new SQL tab
Diffstat (limited to 'core')
3 files changed, 25 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index b6a0119c69..462d5c96d4 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -153,11 +153,12 @@ class Accumulable[R, T] private[spark] ( value_ = zero deserialized = true // Automatically register the accumulator when it is deserialized with the task closure. - // Note that internal accumulators are deserialized before the TaskContext is created and - // are registered in the TaskContext constructor. - if (!isInternal) { - val taskContext = TaskContext.get() - assume(taskContext != null, "Task context was null when deserializing user accumulators") + // + // Note internal accumulators sent with task are deserialized before the TaskContext is created + // and are registered in the TaskContext constructor. Other internal accumulators, such SQL + // metrics, still need to register here. + val taskContext = TaskContext.get() + if (taskContext != null) { taskContext.registerAccumulator(this) } } @@ -255,8 +256,8 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa * @tparam T result type */ class Accumulator[T] private[spark] ( - @transient initialValue: T, - param: AccumulatorParam[T], + @transient private[spark] val initialValue: T, + private[spark] val param: AccumulatorParam[T], name: Option[String], internal: Boolean) extends Accumulable[T, T](initialValue, param, name, internal) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4380cf45cc..0c0705325b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1239,6 +1239,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display + * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the + * driver can access the accumulator's `value`. The latest local value of such accumulator will be + * sent back to the driver via heartbeats. + * + * @tparam T type that can be added to the accumulator, must be thread safe + */ + private[spark] def internalAccumulator[T](initialValue: T, name: String)( + implicit param: AccumulatorParam[T]): Accumulator[T] = { + val acc = new Accumulator(initialValue, param, Some(name), internal = true) + cleaner.foreach(_.registerAccumulatorForCleanup(acc)) + acc + } + + /** * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values * with `+=`. Only the driver can access the accumuable's `value`. * @tparam R accumulator result type diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7bc7fce7ae..5d78a9dc88 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -249,6 +249,7 @@ private[spark] class Executor( m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) + m.updateAccumulators() } val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) @@ -300,6 +301,7 @@ private[spark] class Executor( task.metrics.map { m => m.setExecutorRunTime(System.currentTimeMillis() - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) + m.updateAccumulators() m } } |