aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-08-05 13:08:23 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-05 13:08:23 -0700
commit74f82c71b03d265a7d0c98ce196ca8c44de002e8 (patch)
tree16f0e938f34e4d7e6f61cfe71cfa8f7847e5eb7e
parentac3440f4f3c4b79070ffec7db0b08ad062b4df90 (diff)
downloadspark-74f82c71b03d265a7d0c98ce196ca8c44de002e8.tar.gz
spark-74f82c71b03d265a7d0c98ce196ca8c44de002e8.tar.bz2
spark-74f82c71b03d265a7d0c98ce196ca8c44de002e8.zip
SPARK-2380: Support displaying accumulator values in the web UI
This patch adds support for giving accumulators user-visible names and displaying accumulator values in the web UI. This allows users to create custom counters that can display in the UI. The current approach displays both the accumulator deltas caused by each task and a "current" value of the accumulator totals for each stage, which gets update as tasks finish. Currently in Spark developers have been extending the `TaskMetrics` functionality to provide custom instrumentation for RDD's. This provides a potentially nicer alternative of going through the existing accumulator framework (actually `TaskMetrics` and accumulators are on an awkward collision course as we add more features to the former). The current patch demo's how we can use the feature to provide instrumentation for RDD input sizes. The nice thing about going through accumulators is that users can read the current value of the data being tracked in their programs. This could be useful to e.g. decide to short-circuit a Spark stage depending on how things are going. ![counters](https://cloud.githubusercontent.com/assets/320616/3488815/6ee7bc34-0505-11e4-84ce-e36d9886e2cf.png) Author: Patrick Wendell <pwendell@gmail.com> Closes #1309 from pwendell/metrics and squashes the following commits: 8815308 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into HEAD 93fbe0f [Patrick Wendell] Other minor fixes cc43f68 [Patrick Wendell] Updating unit tests c991b1b [Patrick Wendell] Moving some code into the Accumulators class 9a9ba3c [Patrick Wendell] More merge fixes c5ace9e [Patrick Wendell] More merge conflicts 1da15e3 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into metrics 9860c55 [Patrick Wendell] Potential solution to posting listener events 0bb0e33 [Patrick Wendell] Remove "display" variable and assume display = name.isDefined 0ec4ac7 [Patrick Wendell] Java API's e95bf69 [Patrick Wendell] Stash be97261 [Patrick Wendell] Style fix 8407308 [Patrick Wendell] Removing examples in Hadoop and RDD class 64d405f [Patrick Wendell] Adding missing file 5d8b156 [Patrick Wendell] Changes based on Kay's review. 9f18bad [Patrick Wendell] Minor style changes and tests 7a63abc [Patrick Wendell] Adding Json serialization and responding to Reynold's feedback ad85076 [Patrick Wendell] Example of using named accumulators for custom RDD metrics. 0b72660 [Patrick Wendell] Initial WIP example of supporing globally named accumulators.
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala39
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala62
-rw-r--r--docs/programming-guide.md6
13 files changed, 294 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 9c55bfbb47..12f2fe031c 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
+ * @param name human-readable name for use in Spark's web UI
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@transient initialValue: R,
- param: AccumulableParam[R, T])
+ param: AccumulableParam[R, T],
+ val name: Option[String])
extends Serializable {
- val id = Accumulators.newId
+ def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
+ this(initialValue, param, None)
+
+ val id: Long = Accumulators.newId
+
@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
private var deserialized = false
@@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
-class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
- extends Accumulable[T,T](initialValue, param)
+class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
+ extends Accumulable[T,T](initialValue, param, name) {
+ def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
+}
/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
@@ -281,4 +289,7 @@ private object Accumulators {
}
}
}
+
+ def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
+ def stringifyValue(value: Any) = "%s".format(value)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9ba21cfcde..e132955f0f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -761,6 +761,15 @@ class SparkContext(config: SparkConf) extends Logging {
new Accumulator(initialValue, param)
/**
+ * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
+ * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
+ * driver can access the accumulator's `value`.
+ */
+ def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
+ new Accumulator(initialValue, param, Some(name))
+ }
+
+ /**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
@@ -770,6 +779,16 @@ class SparkContext(config: SparkConf) extends Logging {
new Accumulable(initialValue, param)
/**
+ * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
+ * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
+ * access the accumuable's `value`.
+ * @tparam T accumulator type
+ * @tparam R type that can be added to the accumulator
+ */
+ def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
+ new Accumulable(initialValue, param, Some(name))
+
+ /**
* Create an accumulator from a "mutable collection" type.
*
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index d9d1c5955c..e0a4815940 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -430,6 +430,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
/**
+ * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ *
+ * This version supports naming the accumulator for display in Spark's web UI.
+ */
+ def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
+ sc.accumulator(initialValue, name)(IntAccumulatorParam)
+ .asInstanceOf[Accumulator[java.lang.Integer]]
+
+ /**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
@@ -437,18 +447,47 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
/**
+ * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ *
+ * This version supports naming the accumulator for display in Spark's web UI.
+ */
+ def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
+ sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
+ .asInstanceOf[Accumulator[java.lang.Double]]
+
+ /**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
/**
+ * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ *
+ * This version supports naming the accumulator for display in Spark's web UI.
+ */
+ def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
+ intAccumulator(initialValue, name)
+
+ /**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue)
+
+ /**
+ * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
+ * to using the `add` method. Only the master can access the accumulator's `value`.
+ *
+ * This version supports naming the accumulator for display in Spark's web UI.
+ */
+ def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
+ doubleAccumulator(initialValue, name)
+
/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
@@ -457,6 +496,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.accumulator(initialValue)(accumulatorParam)
/**
+ * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
+ * values to using the `add` method. Only the master can access the accumulator's `value`.
+ *
+ * This version supports naming the accumulator for display in Spark's web UI.
+ */
+ def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
+ : Accumulator[T] =
+ sc.accumulator(initialValue, name)(accumulatorParam)
+
+ /**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
@@ -464,6 +513,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.accumulable(initialValue)(param)
/**
+ * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
+ * can "add" values with `add`. Only the master can access the accumuable's `value`.
+ *
+ * This version supports naming the accumulator for display in Spark's web UI.
+ */
+ def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
+ : Accumulable[T, R] =
+ sc.accumulable(initialValue, name)(param)
+
+ /**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
new file mode 100644
index 0000000000..fa83372bb4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
+ */
+@DeveloperApi
+class AccumulableInfo (
+ val id: Long,
+ val name: String,
+ val update: Option[String], // represents a partial update within a task
+ val value: String) {
+
+ override def equals(other: Any): Boolean = other match {
+ case acc: AccumulableInfo =>
+ this.id == acc.id && this.name == acc.name &&
+ this.update == acc.update && this.value == acc.value
+ case _ => false
+ }
+}
+
+object AccumulableInfo {
+ def apply(id: Long, name: String, update: Option[String], value: String) =
+ new AccumulableInfo(id, name, update, value)
+
+ def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9fa3a4e9c7..430e45ada5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -883,8 +883,14 @@ class DAGScheduler(
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
- listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
- event.taskMetrics))
+
+ // The success case is dealt with separately below, since we need to compute accumulator
+ // updates before posting.
+ if (event.reason != Success) {
+ listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
+ event.taskMetrics))
+ }
+
if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
@@ -906,12 +912,26 @@ class DAGScheduler(
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
+ event.accumUpdates.foreach { case (id, partialValue) =>
+ val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
+ // To avoid UI cruft, ignore cases where value wasn't updated
+ if (acc.name.isDefined && partialValue != acc.zero) {
+ val name = acc.name.get
+ val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
+ val stringValue = Accumulators.stringifyValue(acc.value)
+ stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
+ event.taskInfo.accumulables +=
+ AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
+ }
+ }
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
+ listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
+ event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 480891550e..2a407e47a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import scala.collection.mutable.HashMap
+
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo
@@ -37,6 +39,8 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
+ /** Terminal values of accumulables updated during this stage. */
+ val accumulables = HashMap[Long, AccumulableInfo]()
def stageFailed(reason: String) {
failureReason = Some(reason)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index ca0595f351..6fa1f2c880 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import scala.collection.mutable.ListBuffer
+
import org.apache.spark.annotation.DeveloperApi
/**
@@ -42,6 +44,13 @@ class TaskInfo(
var gettingResultTime: Long = 0
/**
+ * Intermediate updates to accumulables during this task. Note that it is valid for the same
+ * accumulable to be updated multiple times in a single task or for two accumulables with the
+ * same name but different IDs to exist in a task.
+ */
+ val accumulables = ListBuffer[AccumulableInfo]()
+
+ /**
* The time when the task has completed successfully (including the time to remotely fetch
* results, if necessary).
*/
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index da2f5d3172..a57a354620 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ui.jobs
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +65,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
+ for ((id, info) <- stageCompleted.stageInfo.accumulables) {
+ stageData.accumulables(id) = info
+ }
+
poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
@@ -130,6 +134,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
+ for (accumulableInfo <- info.accumulables) {
+ stageData.accumulables(accumulableInfo.id) = accumulableInfo
+ }
+
val execSummaryMap = stageData.executorSummary
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index cab26b9e2f..8bc1ba758c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
-import scala.xml.Node
+import scala.xml.{Node, Unparsed}
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}
+import org.apache.spark.scheduler.AccumulableInfo
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
@@ -51,6 +52,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
+ val accumulables = listener.stageIdToData(stageId).accumulables
val hasInput = stageData.inputBytes > 0
val hasShuffleRead = stageData.shuffleReadBytes > 0
val hasShuffleWrite = stageData.shuffleWriteBytes > 0
@@ -95,10 +97,15 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</ul>
</div>
// scalastyle:on
+ val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
+ def accumulableRow(acc: AccumulableInfo) = <tr><td>{acc.name}</td><td>{acc.value}</td></tr>
+ val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow,
+ accumulables.values.toSeq)
+
val taskHeaders: Seq[String] =
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
- "Launch Time", "Duration", "GC Time") ++
+ "Launch Time", "Duration", "GC Time", "Accumulators") ++
{if (hasInput) Seq("Input") else Nil} ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
@@ -208,11 +215,16 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val executorTable = new ExecutorTable(stageId, parent)
+
+ val maybeAccumulableTable: Seq[Node] =
+ if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
+
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
+ maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable
UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
@@ -279,6 +291,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
+ <td>
+ {Unparsed(
+ info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("<br/>")
+ )}
+ </td>
<!--
TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 2f96f7909c..85db15472a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ui.jobs
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.TaskInfo
+import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import scala.collection.mutable.HashMap
@@ -51,6 +51,7 @@ private[jobs] object UIData {
var schedulingPool: String = ""
var description: Option[String] = None
+ var accumulables = new HashMap[Long, AccumulableInfo]
var taskData = new HashMap[Long, TaskUIData]
var executorSummary = new HashMap[String, ExecutorSummary]
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index bb6079154a..b112b35936 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -25,6 +25,8 @@ import scala.collection.Map
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
+
import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
ShuffleWriteMetrics, TaskMetrics}
@@ -190,10 +192,13 @@ private[spark] object JsonProtocol {
("Details" -> stageInfo.details) ~
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
- ("Failure Reason" -> failureReason)
+ ("Failure Reason" -> failureReason) ~
+ ("Accumulables" -> JArray(
+ stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
}
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
+ val accumUpdateMap = taskInfo.accumulables
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
@@ -204,7 +209,15 @@ private[spark] object JsonProtocol {
("Speculative" -> taskInfo.speculative) ~
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
- ("Failed" -> taskInfo.failed)
+ ("Failed" -> taskInfo.failed) ~
+ ("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
+ }
+
+ def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
+ ("ID" -> accumulableInfo.id) ~
+ ("Name" -> accumulableInfo.name) ~
+ ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~
+ ("Value" -> accumulableInfo.value)
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -480,16 +493,23 @@ private[spark] object JsonProtocol {
val stageId = (json \ "Stage ID").extract[Int]
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
- val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
+ val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_))
val details = (json \ "Details").extractOpt[String].getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
+ val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
+ case Some(values) => values.map(accumulableInfoFromJson(_))
+ case None => Seq[AccumulableInfo]()
+ }
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
+ for (accInfo <- accumulatedValues) {
+ stageInfo.accumulables(accInfo.id) = accInfo
+ }
stageInfo
}
@@ -505,15 +525,28 @@ private[spark] object JsonProtocol {
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
+ val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
+ case Some(values) => values.map(accumulableInfoFromJson(_))
+ case None => Seq[AccumulableInfo]()
+ }
val taskInfo =
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
taskInfo.gettingResultTime = gettingResultTime
taskInfo.finishTime = finishTime
taskInfo.failed = failed
+ accumulables.foreach { taskInfo.accumulables += _ }
taskInfo
}
+ def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
+ val id = (json \ "ID").extract[Long]
+ val name = (json \ "Name").extract[String]
+ val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
+ val value = (json \ "Value").extract[String]
+ AccumulableInfo(id, name, update, value)
+ }
+
def taskMetricsFromJson(json: JValue): TaskMetrics = {
if (json == JNothing) {
return TaskMetrics.empty
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 9305b6d973..2002a817d9 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -123,15 +123,22 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
- test("StageInfo.details backward compatibility") {
- // StageInfo.details was added after 1.0.0.
+ test("StageInfo backward compatibility") {
val info = makeStageInfo(1, 2, 3, 4L, 5L)
- assert(info.details.nonEmpty)
val newJson = JsonProtocol.stageInfoToJson(info)
- val oldJson = newJson.removeField { case (field, _) => field == "Details" }
+
+ // Fields added after 1.0.0.
+ assert(info.details.nonEmpty)
+ assert(info.accumulables.nonEmpty)
+ val oldJson = newJson
+ .removeField { case (field, _) => field == "Details" }
+ .removeField { case (field, _) => field == "Accumulables" }
+
val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
+
assert(info.name === newInfo.name)
assert("" === newInfo.details)
+ assert(0 === newInfo.accumulables.size)
}
test("InputMetrics backward compatibility") {
@@ -261,6 +268,7 @@ class JsonProtocolSuite extends FunSuite {
(0 until info1.rddInfos.size).foreach { i =>
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
}
+ assert(info1.accumulables === info2.accumulables)
assert(info1.details === info2.details)
}
@@ -293,6 +301,7 @@ class JsonProtocolSuite extends FunSuite {
assert(info1.gettingResultTime === info2.gettingResultTime)
assert(info1.finishTime === info2.finishTime)
assert(info1.failed === info2.failed)
+ assert(info1.accumulables === info2.accumulables)
}
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
@@ -476,13 +485,27 @@ class JsonProtocolSuite extends FunSuite {
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
- new StageInfo(a, "greetings", b, rddInfos, "details")
+ val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
+ val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
+ stageInfo.accumulables(acc1.id) = acc1
+ stageInfo.accumulables(acc2.id) = acc2
+ stageInfo
}
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
- new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
+ val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
+ speculative)
+ val (acc1, acc2, acc3) =
+ (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3))
+ taskInfo.accumulables += acc1
+ taskInfo.accumulables += acc2
+ taskInfo.accumulables += acc3
+ taskInfo
}
+ private def makeAccumulableInfo(id: Int): AccumulableInfo =
+ AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id)
+
/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
* set to true) or read data from a shuffle otherwise.
@@ -536,7 +559,9 @@ class JsonProtocolSuite extends FunSuite {
private val stageSubmittedJsonString =
"""
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
- "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties":
+ "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
+ "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
+ {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties":
{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
"""
@@ -546,7 +571,9 @@ class JsonProtocolSuite extends FunSuite {
"greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
- "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}}
+ "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
+ "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
+ {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}}
"""
private val taskStartJsonString =
@@ -554,7 +581,9 @@ class JsonProtocolSuite extends FunSuite {
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
- |"Failed":false}}
+ |"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
+ |"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
+ |{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}}
""".stripMargin
private val taskGettingResultJsonString =
@@ -562,7 +591,10 @@ class JsonProtocolSuite extends FunSuite {
|{"Event":"SparkListenerTaskGettingResult","Task Info":
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
- | "Finish Time":0,"Failed":false
+ | "Finish Time":0,"Failed":false,
+ | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
+ | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
+ | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
| }
|}
""".stripMargin
@@ -574,7 +606,10 @@ class JsonProtocolSuite extends FunSuite {
|"Task Info":{
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
- | "Getting Result Time":0,"Finish Time":0,"Failed":false
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,
+ | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
+ | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
+ | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
@@ -613,7 +648,10 @@ class JsonProtocolSuite extends FunSuite {
|"Task Info":{
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
- | "Getting Result Time":0,"Finish Time":0,"Failed":false
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,
+ | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
+ | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
+ | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index a88bf27add..6ae780d940 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1174,7 +1174,9 @@ value of the broadcast variable (e.g. if the variable is shipped to a new node l
Accumulators are variables that are only "added" to through an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in
MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
-can add support for new types.
+can add support for new types. If accumulators are created with a name, they will be
+displayed in Spark's UI. This can can be useful for understanding the progress of
+running stages (NOTE: this is not yet supported in Python).
An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala and Python).
@@ -1188,7 +1190,7 @@ The code below shows an accumulator being used to add up the elements of an arra
<div data-lang="scala" markdown="1">
{% highlight scala %}
-scala> val accum = sc.accumulator(0)
+scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)