aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-05 01:51:22 -0700
committerReynold Xin <rxin@databricks.com>2015-08-05 01:51:22 -0700
commit1b0317f64cfe99ff70580eeb99753cd0d31f849a (patch)
tree216ea2171955483f449c2f7b97aee1c7264d89c3 /core
parent1bf608b5ef1e6e2ae4325e13c2bd5e34db62450f (diff)
downloadspark-1b0317f64cfe99ff70580eeb99753cd0d31f849a.tar.gz
spark-1b0317f64cfe99ff70580eeb99753cd0d31f849a.tar.bz2
spark-1b0317f64cfe99ff70580eeb99753cd0d31f849a.zip
[SPARK-8861][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan operator and add a new SQL tab
This PR includes the following changes: ### SPARK-8862: Add basic instrumentation to each SparkPlan operator A SparkPlan can override `def accumulators: Map[String, Accumulator[_]]` to expose its metrics that can be displayed in UI. The UI will use them to track the updates and show them in the web page in real-time. ### SparkSQLExecution and SQLSparkListener `SparkSQLExecution.withNewExecutionId` will set `spark.sql.execution.id` to the local properties so that we can use it to track all jobs that belong to the same query. SQLSparkListener is a listener to track all accumulator updates of all tasks for a query. It receives them from heartbeats can the UI can query them in real-time. When running a query, `SQLSparkListener.onExecutionStart` will be called. When a query is finished, `SQLSparkListener.onExecutionEnd` will be called. And the Spark jobs with the same execution id will be tracked and stored with this query. `SQLSparkListener` has to store all accumulator updates for tasks separately. When a task fails and starts to retry, we need to drop the old accumulator updates. Because we can not revert our changes to an accumulator, we have to maintain these accumulator updates by ourselves so as to drop accumulator updates for a failed task. ### SPARK-8862: A new SQL tab Includes two pages: #### A page for all DataFrame/SQL queries It will show the running, completed and failed queries in 3 tables. It also displays the jobs and their links for a query in each row. #### A detail page for a DataFrame/SQL query In this page, it also shows the SparkPlan metrics in real-time. Run a long-running query, such as ``` val testData = sc.parallelize((1 to 1000000).map(i => (i, i.toString))).toDF() testData.select($"_1").filter($"_1" < 1000).foreach(_ => Thread.sleep(60)) ``` and you will see the metrics keep updating in real-time. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7774) <!-- Reviewable:end --> Author: zsxwing <zsxwing@gmail.com> Closes #7774 from zsxwing/sql-ui and squashes the following commits: 5a2bc99 [zsxwing] Remove UISeleniumSuite and its dependency 57d4cd2 [zsxwing] Use VisibleForTesting annotation cc1c736 [zsxwing] Add SparkPlan.trackNumOfRowsEnabled to make subclasses easy to track the number of rows; fix the issue that the "save" action cannot collect metrics 3771ab0 [zsxwing] Register SQL metrics accmulators 3a101c0 [zsxwing] Change prepareCalled's type to AtomicBoolean for thread-safety b8d5605 [zsxwing] Make prepare idempotent; call children's prepare in SparkPlan.prepare; change doPrepare to def 4ed11a1 [zsxwing] var -> val 332639c [zsxwing] Ignore UISeleniumSuite and SQLListenerSuite."no memory leak" because of SPARK-9580 bb52359 [zsxwing] Address other commens in SQLListener c4d0f5d [zsxwing] Move newPredicate out of the iterator loop 957473c [zsxwing] Move STATIC_RESOURCE_DIR to object SQLTab 7ab4816 [zsxwing] Make SparkPlan accumulator API private[sql] dae195e [zsxwing] Fix the code style and comments 3a66207 [zsxwing] Ignore irrelevant accumulators b8484a1 [zsxwing] Merge branch 'master' into sql-ui 9406592 [zsxwing] Implement the SparkPlan viz 4ebce68 [zsxwing] Add SparkPlan.prepare to support BroadcastHashJoin to run background work in parallel ca1811f [zsxwing] Merge branch 'master' into sql-ui fef6fc6 [zsxwing] Fix a corner case 25f335c [zsxwing] Fix the code style 6eae828 [zsxwing] SQLSparkListener -> SQLListener; SparkSQLExecutionUIData -> SQLExecutionUIData; SparkSQLExecution -> SQLExecution 822af75 [zsxwing] Add SQLSparkListenerSuite and fix the issue about onExecutionEnd and onJobEnd 6be626f [zsxwing] Add UISeleniumSuite to test UI d02a24d [zsxwing] Make ExecutionPage private 23abf73 [zsxwing] [SPARK-8862][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan operator and add a new SQL tab
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
3 files changed, 25 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index b6a0119c69..462d5c96d4 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -153,11 +153,12 @@ class Accumulable[R, T] private[spark] (
value_ = zero
deserialized = true
// Automatically register the accumulator when it is deserialized with the task closure.
- // Note that internal accumulators are deserialized before the TaskContext is created and
- // are registered in the TaskContext constructor.
- if (!isInternal) {
- val taskContext = TaskContext.get()
- assume(taskContext != null, "Task context was null when deserializing user accumulators")
+ //
+ // Note internal accumulators sent with task are deserialized before the TaskContext is created
+ // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
+ // metrics, still need to register here.
+ val taskContext = TaskContext.get()
+ if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}
@@ -255,8 +256,8 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @tparam T result type
*/
class Accumulator[T] private[spark] (
- @transient initialValue: T,
- param: AccumulatorParam[T],
+ @transient private[spark] val initialValue: T,
+ private[spark] val param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4380cf45cc..0c0705325b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1239,6 +1239,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
+ * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
+ * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
+ * driver can access the accumulator's `value`. The latest local value of such accumulator will be
+ * sent back to the driver via heartbeats.
+ *
+ * @tparam T type that can be added to the accumulator, must be thread safe
+ */
+ private[spark] def internalAccumulator[T](initialValue: T, name: String)(
+ implicit param: AccumulatorParam[T]): Accumulator[T] = {
+ val acc = new Accumulator(initialValue, param, Some(name), internal = true)
+ cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+ acc
+ }
+
+ /**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
* @tparam R accumulator result type
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 7bc7fce7ae..5d78a9dc88 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -249,6 +249,7 @@ private[spark] class Executor(
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
+ m.updateAccumulators()
}
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
@@ -300,6 +301,7 @@ private[spark] class Executor(
task.metrics.map { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
+ m.updateAccumulators()
m
}
}