aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-27 11:15:48 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-27 11:15:48 -0800
commit87abcf7df921a5937fdb2bae8bfb30bfabc4970a (patch)
tree74b5a1cb19f06c40bd99a85feee3f35efbb9a496
parentedd473751b59b55fa3daede5ed7bc19ea8bd7170 (diff)
downloadspark-87abcf7df921a5937fdb2bae8bfb30bfabc4970a.tar.gz
spark-87abcf7df921a5937fdb2bae8bfb30bfabc4970a.tar.bz2
spark-87abcf7df921a5937fdb2bae8bfb30bfabc4970a.zip
[SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators
The high level idea is that instead of having the executors send both accumulator updates and TaskMetrics, we should have them send only accumulator updates. This eliminates the need to maintain both code paths since one can be implemented in terms of the other. This effort is split into two parts: **SPARK-12895: Implement TaskMetrics using accumulators.** TaskMetrics is basically just a bunch of accumulable fields. This patch makes TaskMetrics a syntactic wrapper around a collection of accumulators so we don't need to send TaskMetrics from the executors to the driver. **SPARK-12896: Send only accumulator updates to the driver.** Now that TaskMetrics are expressed in terms of accumulators, we can capture all TaskMetrics values if we just send accumulator updates from the executors to the driver. This completes the parent issue SPARK-10620. While an effort has been made to preserve as much of the public API as possible, there were a few known breaking DeveloperApi changes that would be very awkward to maintain. I will gather the full list shortly and post it here. Note: This was once part of #10717. This patch is split out into its own patch from there to make it easier for others to review. Other smaller pieces of already been merged into master. Author: Andrew Or <andrew@databricks.com> Closes #10835 from andrewor14/task-metrics-use-accums.
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java8
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala101
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala199
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala81
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala104
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala370
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala102
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala124
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala3
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala324
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala331
-rw-r--r--core/src/test/scala/org/apache/spark/SparkFunSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala540
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala281
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala56
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala67
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala515
-rw-r--r--project/MimaExcludes.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala2
70 files changed, 3012 insertions, 1141 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index d3d79a27ea..128a82579b 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -444,13 +444,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public Option<MapStatus> stop(boolean success) {
try {
- // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
- Map<String, Accumulator<Object>> internalAccumulators =
- taskContext.internalMetricsToAccumulators();
- if (internalAccumulators != null) {
- internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
- .add(getPeakMemoryUsedBytes());
- }
+ taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());
if (stopping) {
return Option.apply(null);
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index a456d420b8..bde136141f 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -35,40 +35,67 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
+ * All accumulators created on the driver to be used on the executors must be registered with
+ * [[Accumulators]]. This is already done automatically for accumulators created by the user.
+ * Internal accumulators must be explicitly registered by the caller.
+ *
+ * Operations are not thread-safe.
+ *
+ * @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
+ * @param countFailedValues whether to accumulate values from failed tasks. This is set to true
+ * for system and time metrics like serialization time or bytes spilled,
+ * and false for things with absolute values like number of input rows.
+ * This should be used for internal metrics only.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
-class Accumulable[R, T] private[spark] (
- initialValue: R,
+class Accumulable[R, T] private (
+ val id: Long,
+ @transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
- internal: Boolean)
+ internal: Boolean,
+ private[spark] val countFailedValues: Boolean)
extends Serializable {
private[spark] def this(
- @transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
- this(initialValue, param, None, internal)
+ initialValue: R,
+ param: AccumulableParam[R, T],
+ name: Option[String],
+ internal: Boolean,
+ countFailedValues: Boolean) = {
+ this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
}
- def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
- this(initialValue, param, name, false)
+ private[spark] def this(
+ initialValue: R,
+ param: AccumulableParam[R, T],
+ name: Option[String],
+ internal: Boolean) = {
+ this(initialValue, param, name, internal, false /* countFailedValues */)
+ }
- def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
- this(initialValue, param, None)
+ def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
+ this(initialValue, param, name, false /* internal */)
- val id: Long = Accumulators.newId
+ def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
- @volatile @transient private var value_ : R = initialValue // Current value on master
- val zero = param.zero(initialValue) // Zero value to be passed to workers
+ @volatile @transient private var value_ : R = initialValue // Current value on driver
+ val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false
- Accumulators.register(this)
+ // In many places we create internal accumulators without access to the active context cleaner,
+ // so if we register them here then we may never unregister these accumulators. To avoid memory
+ // leaks, we require the caller to explicitly register internal accumulators elsewhere.
+ if (!internal) {
+ Accumulators.register(this)
+ }
/**
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
@@ -78,6 +105,17 @@ class Accumulable[R, T] private[spark] (
private[spark] def isInternal: Boolean = internal
/**
+ * Return a copy of this [[Accumulable]].
+ *
+ * The copy will have the same ID as the original and will not be registered with
+ * [[Accumulators]] again. This method exists so that the caller can avoid passing the
+ * same mutable instance around.
+ */
+ private[spark] def copy(): Accumulable[R, T] = {
+ new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
+ }
+
+ /**
* Add more data to this accumulator / accumulable
* @param term the data to add
*/
@@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] (
def merge(term: R) { value_ = param.addInPlace(value_, term)}
/**
- * Access the accumulator's current value; only allowed on master.
+ * Access the accumulator's current value; only allowed on driver.
*/
def value: R = {
if (!deserialized) {
@@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] (
def localValue: R = value_
/**
- * Set the accumulator's value; only allowed on master.
+ * Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
if (!deserialized) {
@@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] (
}
/**
- * Set the accumulator's value; only allowed on master
+ * Set the accumulator's value. For internal use only.
*/
- def setValue(newValue: R) {
- this.value = newValue
- }
+ def setValue(newValue: R): Unit = { value_ = newValue }
+
+ /**
+ * Set the accumulator's value. For internal use only.
+ */
+ private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
+
// Automatically register the accumulator when it is deserialized with the task closure.
- //
- // Note internal accumulators sent with task are deserialized before the TaskContext is created
- // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
- // metrics, still need to register here.
+ // This is for external accumulators and internal ones that do not represent task level
+ // metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 007136e6ae..558bd447e2 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -17,9 +17,14 @@
package org.apache.spark
-import scala.collection.{mutable, Map}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
import scala.ref.WeakReference
+import org.apache.spark.storage.{BlockId, BlockStatus}
+
/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
@@ -49,14 +54,18 @@ import scala.ref.WeakReference
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
+ * @param name human-readable name associated with this accumulator
+ * @param internal whether this accumulator is used internally within Spark only
+ * @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
- internal: Boolean)
- extends Accumulable[T, T](initialValue, param, name, internal) {
+ internal: Boolean,
+ override val countFailedValues: Boolean = false)
+ extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
@@ -75,43 +84,63 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
+ * TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
*/
+ @GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
- private var lastId: Long = 0
+ private val nextId = new AtomicLong(0L)
- def newId(): Long = synchronized {
- lastId += 1
- lastId
- }
+ /**
+ * Return a globally unique ID for a new [[Accumulable]].
+ * Note: Once you copy the [[Accumulable]] the ID is no longer unique.
+ */
+ def newId(): Long = nextId.getAndIncrement
+ /**
+ * Register an [[Accumulable]] created on the driver such that it can be used on the executors.
+ *
+ * All accumulators registered here can later be used as a container for accumulating partial
+ * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
+ * Note: if an accumulator is registered here, it should also be registered with the active
+ * context cleaner for cleanup so as to avoid memory leaks.
+ *
+ * If an [[Accumulable]] with the same ID was already registered, this does nothing instead
+ * of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct
+ * [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
+ */
def register(a: Accumulable[_, _]): Unit = synchronized {
- originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+ if (!originals.contains(a.id)) {
+ originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+ }
}
- def remove(accId: Long) {
- synchronized {
- originals.remove(accId)
- }
+ /**
+ * Unregister the [[Accumulable]] with the given ID, if any.
+ */
+ def remove(accId: Long): Unit = synchronized {
+ originals.remove(accId)
}
- // Add values to the original accumulators with some given IDs
- def add(values: Map[Long, Any]): Unit = synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- // Since we are now storing weak references, we must check whether the underlying data
- // is valid.
- originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
- case None =>
- throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
- }
- } else {
- logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
+ /**
+ * Return the [[Accumulable]] registered with the given ID, if any.
+ */
+ def get(id: Long): Option[Accumulable[_, _]] = synchronized {
+ originals.get(id).map { weakRef =>
+ // Since we are storing weak references, we must check whether the underlying data is valid.
+ weakRef.get.getOrElse {
+ throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
}
}
}
+ /**
+ * Clear all registered [[Accumulable]]s. For testing only.
+ */
+ def clear(): Unit = synchronized {
+ originals.clear()
+ }
+
}
@@ -156,5 +185,23 @@ object AccumulatorParam {
def zero(initialValue: Float): Float = 0f
}
- // TODO: Add AccumulatorParams for other types, e.g. lists and strings
+ // Note: when merging values, this param just adopts the newer value. This is used only
+ // internally for things that shouldn't really be accumulated across tasks, like input
+ // read method, which should be the same across all tasks in the same stage.
+ private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
+ def addInPlace(t1: String, t2: String): String = t2
+ def zero(initialValue: String): String = ""
+ }
+
+ // Note: this is expensive as it makes a copy of the list every time the caller adds an item.
+ // A better way to use this is to first accumulate the values yourself then them all at once.
+ private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
+ def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
+ def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
+ }
+
+ // For the internal metric that records what blocks are updated in a particular task
+ private[spark] object UpdatedBlockStatusesAccumulatorParam
+ extends ListAccumulatorParam[(BlockId, BlockStatus)]
+
}
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 62629000cf..e493d9a3cf 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -57,8 +57,7 @@ case class Aggregator[K, V, C] (
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
- c.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index e03977828b..45b20c0e8d 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
- taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates
blockManagerId: BlockManagerId)
/**
@@ -119,14 +119,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)
// Messages received from executors
- case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
+ case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
- executorId, taskMetrics, blockManagerId)
+ executorId, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 6ea997c079..c191122c06 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -17,42 +17,193 @@
package org.apache.spark
+import org.apache.spark.storage.{BlockId, BlockStatus}
-// This is moved to its own file because many more things will be added to it in SPARK-10620.
+
+/**
+ * A collection of fields and methods concerned with internal accumulators that represent
+ * task level metrics.
+ */
private[spark] object InternalAccumulator {
- val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
- val TEST_ACCUMULATOR = "testAccumulator"
-
- // For testing only.
- // This needs to be a def since we don't want to reuse the same accumulator across stages.
- private def maybeTestAccumulator: Option[Accumulator[Long]] = {
- if (sys.props.contains("spark.testing")) {
- Some(new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
- } else {
- None
+
+ import AccumulatorParam._
+
+ // Prefixes used in names of internal task level metrics
+ val METRICS_PREFIX = "internal.metrics."
+ val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
+ val SHUFFLE_WRITE_METRICS_PREFIX = METRICS_PREFIX + "shuffle.write."
+ val OUTPUT_METRICS_PREFIX = METRICS_PREFIX + "output."
+ val INPUT_METRICS_PREFIX = METRICS_PREFIX + "input."
+
+ // Names of internal task level metrics
+ val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime"
+ val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
+ val RESULT_SIZE = METRICS_PREFIX + "resultSize"
+ val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
+ val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"
+ val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
+ val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
+ val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
+ val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
+ val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"
+
+ // scalastyle:off
+
+ // Names of shuffle read metrics
+ object shuffleRead {
+ val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched"
+ val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched"
+ val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
+ val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
+ val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
+ val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+ }
+
+ // Names of shuffle write metrics
+ object shuffleWrite {
+ val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten"
+ val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten"
+ val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime"
+ }
+
+ // Names of output metrics
+ object output {
+ val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod"
+ val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten"
+ val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten"
+ }
+
+ // Names of input metrics
+ object input {
+ val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod"
+ val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
+ val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
+ }
+
+ // scalastyle:on
+
+ /**
+ * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]].
+ */
+ def create(name: String): Accumulator[_] = {
+ require(name.startsWith(METRICS_PREFIX),
+ s"internal accumulator name must start with '$METRICS_PREFIX': $name")
+ getParam(name) match {
+ case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
+ case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
+ case p @ StringAccumulatorParam => newMetric[String]("", name, p)
+ case p @ UpdatedBlockStatusesAccumulatorParam =>
+ newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
+ case p => throw new IllegalArgumentException(
+ s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.")
+ }
+ }
+
+ /**
+ * Get the [[AccumulatorParam]] associated with the internal metric name,
+ * which must begin with [[METRICS_PREFIX]].
+ */
+ def getParam(name: String): AccumulatorParam[_] = {
+ require(name.startsWith(METRICS_PREFIX),
+ s"internal accumulator name must start with '$METRICS_PREFIX': $name")
+ name match {
+ case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
+ case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
+ case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
+ case input.READ_METHOD => StringAccumulatorParam
+ case output.WRITE_METHOD => StringAccumulatorParam
+ case _ => LongAccumulatorParam
}
}
/**
* Accumulators for tracking internal metrics.
+ */
+ def create(): Seq[Accumulator[_]] = {
+ Seq[String](
+ EXECUTOR_DESERIALIZE_TIME,
+ EXECUTOR_RUN_TIME,
+ RESULT_SIZE,
+ JVM_GC_TIME,
+ RESULT_SERIALIZATION_TIME,
+ MEMORY_BYTES_SPILLED,
+ DISK_BYTES_SPILLED,
+ PEAK_EXECUTION_MEMORY,
+ UPDATED_BLOCK_STATUSES).map(create) ++
+ createShuffleReadAccums() ++
+ createShuffleWriteAccums() ++
+ createInputAccums() ++
+ createOutputAccums() ++
+ sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
+ }
+
+ /**
+ * Accumulators for tracking shuffle read metrics.
+ */
+ def createShuffleReadAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ shuffleRead.REMOTE_BLOCKS_FETCHED,
+ shuffleRead.LOCAL_BLOCKS_FETCHED,
+ shuffleRead.REMOTE_BYTES_READ,
+ shuffleRead.LOCAL_BYTES_READ,
+ shuffleRead.FETCH_WAIT_TIME,
+ shuffleRead.RECORDS_READ).map(create)
+ }
+
+ /**
+ * Accumulators for tracking shuffle write metrics.
+ */
+ def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ shuffleWrite.BYTES_WRITTEN,
+ shuffleWrite.RECORDS_WRITTEN,
+ shuffleWrite.WRITE_TIME).map(create)
+ }
+
+ /**
+ * Accumulators for tracking input metrics.
+ */
+ def createInputAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ input.READ_METHOD,
+ input.BYTES_READ,
+ input.RECORDS_READ).map(create)
+ }
+
+ /**
+ * Accumulators for tracking output metrics.
+ */
+ def createOutputAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ output.WRITE_METHOD,
+ output.BYTES_WRITTEN,
+ output.RECORDS_WRITTEN).map(create)
+ }
+
+ /**
+ * Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
- def create(sc: SparkContext): Seq[Accumulator[Long]] = {
- val internalAccumulators = Seq(
- // Execution memory refers to the memory used by internal data structures created
- // during shuffles, aggregations and joins. The value of this accumulator should be
- // approximately the sum of the peak sizes across all such data structures created
- // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
- new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
- ) ++ maybeTestAccumulator.toSeq
- internalAccumulators.foreach { accumulator =>
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+ def create(sc: SparkContext): Seq[Accumulator[_]] = {
+ val accums = create()
+ accums.foreach { accum =>
+ Accumulators.register(accum)
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
}
- internalAccumulators
+ accums
+ }
+
+ /**
+ * Create a new accumulator representing an internal task metric.
+ */
+ private def newMetric[T](
+ initialValue: T,
+ name: String,
+ param: AccumulatorParam[T]): Accumulator[T] = {
+ new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 7704abc134..9f49cf1c4c 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -64,7 +64,7 @@ object TaskContext {
* An empty task context that does not represent an actual task.
*/
private[spark] def empty(): TaskContextImpl = {
- new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty)
+ new TaskContextImpl(0, 0, 0, 0, null, null)
}
}
@@ -138,7 +138,6 @@ abstract class TaskContext extends Serializable {
*/
def taskAttemptId(): Long
- /** ::DeveloperApi:: */
@DeveloperApi
def taskMetrics(): TaskMetrics
@@ -161,20 +160,4 @@ abstract class TaskContext extends Serializable {
*/
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit
- /**
- * Return the local values of internal accumulators that belong to this task. The key of the Map
- * is the accumulator id and the value of the Map is the latest accumulator local value.
- */
- private[spark] def collectInternalAccumulators(): Map[Long, Any]
-
- /**
- * Return the local values of accumulators that belong to this task. The key of the Map is the
- * accumulator id and the value of the Map is the latest accumulator local value.
- */
- private[spark] def collectAccumulators(): Map[Long, Any]
-
- /**
- * Accumulators for tracking internal metrics indexed by the name.
- */
- private[spark] val internalMetricsToAccumulators: Map[String, Accumulator[Long]]
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 94ff884b74..27ca46f73d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
@@ -32,11 +32,15 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
- internalAccumulators: Seq[Accumulator[Long]],
- val taskMetrics: TaskMetrics = TaskMetrics.empty)
+ initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
extends TaskContext
with Logging {
+ /**
+ * Metrics associated with this task.
+ */
+ override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators)
+
// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -91,24 +95,8 @@ private[spark] class TaskContextImpl(
override def getMetricsSources(sourceName: String): Seq[Source] =
metricsSystem.getSourcesByName(sourceName)
- @transient private val accumulators = new HashMap[Long, Accumulable[_, _]]
-
- private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized {
- accumulators(a.id) = a
- }
-
- private[spark] override def collectInternalAccumulators(): Map[Long, Any] = synchronized {
- accumulators.filter(_._2.isInternal).mapValues(_.localValue).toMap
+ private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ taskMetrics.registerAccumulator(a)
}
- private[spark] override def collectAccumulators(): Map[Long, Any] = synchronized {
- accumulators.mapValues(_.localValue).toMap
- }
-
- private[spark] override val internalMetricsToAccumulators: Map[String, Accumulator[Long]] = {
- // Explicitly register internal accumulators here because these are
- // not captured in the task closure and are already deserialized
- internalAccumulators.foreach(registerAccumulator)
- internalAccumulators.map { a => (a.name.get, a) }.toMap
- }
}
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 13241b77bf..68340cc704 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -19,8 +19,11 @@ package org.apache.spark
import java.io.{ObjectInputStream, ObjectOutputStream}
+import scala.util.Try
+
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
@@ -115,22 +118,34 @@ case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
- metrics: Option[TaskMetrics],
- private val exceptionWrapper: Option[ThrowableSerializationWrapper])
+ exceptionWrapper: Option[ThrowableSerializationWrapper],
+ accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
extends TaskFailedReason {
+ @deprecated("use accumUpdates instead", "2.0.0")
+ val metrics: Option[TaskMetrics] = {
+ if (accumUpdates.nonEmpty) {
+ Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption
+ } else {
+ None
+ }
+ }
+
/**
* `preserveCause` is used to keep the exception itself so it is available to the
* driver. This may be set to `false` in the event that the exception is not in fact
* serializable.
*/
- private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], preserveCause: Boolean) {
- this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics,
- if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None)
+ private[spark] def this(
+ e: Throwable,
+ accumUpdates: Seq[AccumulableInfo],
+ preserveCause: Boolean) {
+ this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e),
+ if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates)
}
- private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
- this(e, metrics, preserveCause = true)
+ private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) {
+ this(e, accumUpdates, preserveCause = true)
}
def exception: Option[Throwable] = exceptionWrapper.flatMap {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 8ba3f5e241..06b5101b1f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -370,6 +370,14 @@ object SparkHadoopUtil {
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
+ /**
+ * Number of records to update input metrics when reading from HadoopRDDs.
+ *
+ * Each update is potentially expensive because we need to use reflection to access the
+ * Hadoop FileSystem API of interest (only available in 2.5), so we should do this sparingly.
+ */
+ private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000
+
def get: SparkHadoopUtil = {
// Check each time to support changing to/from YARN
val yarnMode = java.lang.Boolean.valueOf(
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 030ae41db4..51c000ea5c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -31,7 +31,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
+import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
@@ -210,7 +210,7 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var threwException = true
- val (value, accumUpdates) = try {
+ val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
@@ -249,10 +249,11 @@ private[spark] class Executor(
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
- m.updateAccumulators()
}
- val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
+ // Note: accumulator updates must be collected after TaskMetrics is updated
+ val accumUpdates = task.collectAccumulatorUpdates()
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
@@ -297,21 +298,25 @@ private[spark] class Executor(
// the default uncaught exception handler, which will terminate the Executor.
logError(s"Exception in $taskName (TID $taskId)", t)
- val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
- task.metrics.map { m =>
+ // Collect latest accumulator values to report back to the driver
+ val accumulatorUpdates: Seq[AccumulableInfo] =
+ if (task != null) {
+ task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- m.updateAccumulators()
- m
}
+ task.collectAccumulatorUpdates(taskFailed = true)
+ } else {
+ Seq.empty[AccumulableInfo]
}
+
val serializedTaskEndReason = {
try {
- ser.serialize(new ExceptionFailure(t, metrics))
+ ser.serialize(new ExceptionFailure(t, accumulatorUpdates))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
- ser.serialize(new ExceptionFailure(t, metrics, false))
+ ser.serialize(new ExceptionFailure(t, accumulatorUpdates, preserveCause = false))
}
}
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
@@ -418,33 +423,21 @@ private[spark] class Executor(
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
- // list of (task id, metrics) to send back to the driver
- val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
+ // list of (task id, accumUpdates) to send back to the driver
+ val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulableInfo])]()
val curGCTime = computeTotalGcTime()
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
metrics.mergeShuffleReadMetrics()
- metrics.updateInputMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
- metrics.updateAccumulators()
-
- if (isLocal) {
- // JobProgressListener will hold an reference of it during
- // onExecutorMetricsUpdate(), then JobProgressListener can not see
- // the changes of metrics any more, so make a deep copy of it
- val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
- tasksMetrics += ((taskRunner.taskId, copiedMetrics))
- } else {
- // It will be copied by serialization
- tasksMetrics += ((taskRunner.taskId, metrics))
- }
+ accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates()))
}
}
}
- val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
+ val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
try {
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 8f1d7f89a4..ed9e157ce7 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -17,13 +17,15 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Method by which input data was read. Network means that the data was read over the network
+ * Method by which input data was read. Network means that the data was read over the network
* from a remote block manager (which may have stored the data on-disk or in-memory).
+ * Operations are not thread-safe.
*/
@DeveloperApi
object DataReadMethod extends Enumeration with Serializable {
@@ -34,44 +36,75 @@ object DataReadMethod extends Enumeration with Serializable {
/**
* :: DeveloperApi ::
- * Metrics about reading input data.
+ * A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
-case class InputMetrics(readMethod: DataReadMethod.Value) {
+class InputMetrics private (
+ _bytesRead: Accumulator[Long],
+ _recordsRead: Accumulator[Long],
+ _readMethod: Accumulator[String])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ),
+ TaskMetrics.getAccum[String](accumMap, InternalAccumulator.input.READ_METHOD))
+ }
/**
- * This is volatile so that it is visible to the updater thread.
+ * Create a new [[InputMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists because of SPARK-5225, where we are forced to use a dummy [[InputMetrics]]
+ * because we want to ignore metrics from a second read method. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerInputMetrics]].
*/
- @volatile @transient var bytesReadCallback: Option[() => Long] = None
+ private[executor] def this() {
+ this(InternalAccumulator.createInputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
+ }
/**
- * Total bytes read.
+ * Total number of bytes read.
*/
- private var _bytesRead: Long = _
- def bytesRead: Long = _bytesRead
- def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
+ def bytesRead: Long = _bytesRead.localValue
/**
- * Total records read.
+ * Total number of records read.
*/
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- def incRecordsRead(records: Long): Unit = _recordsRead += records
+ def recordsRead: Long = _recordsRead.localValue
/**
- * Invoke the bytesReadCallback and mutate bytesRead.
+ * The source from which this task reads its input.
*/
- def updateBytesRead() {
- bytesReadCallback.foreach { c =>
- _bytesRead = c()
- }
+ def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
+
+ @deprecated("incrementing input metrics is for internal use only", "2.0.0")
+ def incBytesRead(v: Long): Unit = _bytesRead.add(v)
+ @deprecated("incrementing input metrics is for internal use only", "2.0.0")
+ def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
+ private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
+ private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
+ _readMethod.setValue(v.toString)
+
+}
+
+/**
+ * Deprecated methods to preserve case class matching behavior before Spark 2.0.
+ */
+object InputMetrics {
+
+ @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
+ def apply(readMethod: DataReadMethod.Value): InputMetrics = {
+ val im = new InputMetrics
+ im.setReadMethod(readMethod)
+ im
}
- /**
- * Register a function that can be called to get up-to-date information on how many bytes the task
- * has read from an input source.
- */
- def setBytesReadCallback(f: Option[() => Long]) {
- bytesReadCallback = f
+ @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
+ def unapply(input: InputMetrics): Option[DataReadMethod.Value] = {
+ Some(input.readMethod)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index ad132d004c..0b37d559c7 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,12 +17,14 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Method by which output data was written.
+ * Operations are not thread-safe.
*/
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
@@ -33,21 +35,70 @@ object DataWriteMethod extends Enumeration with Serializable {
/**
* :: DeveloperApi ::
- * Metrics about writing output data.
+ * A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
-case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+class OutputMetrics private (
+ _bytesWritten: Accumulator[Long],
+ _recordsWritten: Accumulator[Long],
+ _writeMethod: Accumulator[String])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN),
+ TaskMetrics.getAccum[String](accumMap, InternalAccumulator.output.WRITE_METHOD))
+ }
+
+ /**
+ * Create a new [[OutputMetrics]] that is not associated with any particular task.
+ *
+ * This is only used for preserving matching behavior on [[OutputMetrics]], which used to be
+ * a case class before Spark 2.0. Once we remove support for matching on [[OutputMetrics]]
+ * we can remove this constructor as well.
+ */
+ private[executor] def this() {
+ this(InternalAccumulator.createOutputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
+ }
+
+ /**
+ * Total number of bytes written.
+ */
+ def bytesWritten: Long = _bytesWritten.localValue
+
/**
- * Total bytes written
+ * Total number of records written.
*/
- private var _bytesWritten: Long = _
- def bytesWritten: Long = _bytesWritten
- private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
+ def recordsWritten: Long = _recordsWritten.localValue
/**
- * Total records written
+ * The source to which this task writes its output.
*/
- private var _recordsWritten: Long = 0L
- def recordsWritten: Long = _recordsWritten
- private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
+ def writeMethod: DataWriteMethod.Value = DataWriteMethod.withName(_writeMethod.localValue)
+
+ private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
+ private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
+ private[spark] def setWriteMethod(v: DataWriteMethod.Value): Unit =
+ _writeMethod.setValue(v.toString)
+
+}
+
+/**
+ * Deprecated methods to preserve case class matching behavior before Spark 2.0.
+ */
+object OutputMetrics {
+
+ @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
+ def apply(writeMethod: DataWriteMethod.Value): OutputMetrics = {
+ val om = new OutputMetrics
+ om.setWriteMethod(writeMethod)
+ om
+ }
+
+ @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
+ def unapply(output: OutputMetrics): Option[DataWriteMethod.Value] = {
+ Some(output.writeMethod)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index e985b35ace..50bb645d97 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,71 +17,103 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Metrics pertaining to shuffle data read in a given task.
+ * A collection of accumulators that represent metrics about reading shuffle data.
+ * Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleReadMetrics extends Serializable {
+class ShuffleReadMetrics private (
+ _remoteBlocksFetched: Accumulator[Int],
+ _localBlocksFetched: Accumulator[Int],
+ _remoteBytesRead: Accumulator[Long],
+ _localBytesRead: Accumulator[Long],
+ _fetchWaitTime: Accumulator[Long],
+ _recordsRead: Accumulator[Long])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
+ TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
+ }
+
/**
- * Number of remote blocks fetched in this shuffle by this task
+ * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
+ * many places only to merge their values together later. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerTempShuffleReadMetrics]] followed by
+ * [[TaskMetrics.mergeShuffleReadMetrics]].
*/
- private var _remoteBlocksFetched: Int = _
- def remoteBlocksFetched: Int = _remoteBlocksFetched
- private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
- private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+ private[spark] def this() {
+ this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
+ }
/**
- * Number of local blocks fetched in this shuffle by this task
+ * Number of remote blocks fetched in this shuffle by this task.
*/
- private var _localBlocksFetched: Int = _
- def localBlocksFetched: Int = _localBlocksFetched
- private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
- private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+ def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue
/**
- * Time the task spent waiting for remote shuffle blocks. This only includes the time
- * blocking on shuffle input data. For instance if block B is being fetched while the task is
- * still not finished processing block A, it is not considered to be blocking on block B.
+ * Number of local blocks fetched in this shuffle by this task.
*/
- private var _fetchWaitTime: Long = _
- def fetchWaitTime: Long = _fetchWaitTime
- private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
- private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+ def localBlocksFetched: Int = _localBlocksFetched.localValue
/**
- * Total number of remote bytes read from the shuffle by this task
+ * Total number of remote bytes read from the shuffle by this task.
*/
- private var _remoteBytesRead: Long = _
- def remoteBytesRead: Long = _remoteBytesRead
- private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
- private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+ def remoteBytesRead: Long = _remoteBytesRead.localValue
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
- private var _localBytesRead: Long = _
- def localBytesRead: Long = _localBytesRead
- private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+ def localBytesRead: Long = _localBytesRead.localValue
/**
- * Total bytes fetched in the shuffle by this task (both remote and local).
+ * Time the task spent waiting for remote shuffle blocks. This only includes the time
+ * blocking on shuffle input data. For instance if block B is being fetched while the task is
+ * still not finished processing block A, it is not considered to be blocking on block B.
+ */
+ def fetchWaitTime: Long = _fetchWaitTime.localValue
+
+ /**
+ * Total number of records read from the shuffle by this task.
*/
- def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
+ def recordsRead: Long = _recordsRead.localValue
/**
- * Number of blocks fetched in this shuffle by this task (remote or local)
+ * Total bytes fetched in the shuffle by this task (both remote and local).
*/
- def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
+ def totalBytesRead: Long = remoteBytesRead + localBytesRead
/**
- * Total number of records read from the shuffle by this task
+ * Number of blocks fetched in this shuffle by this task (remote or local).
*/
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- private[spark] def incRecordsRead(value: Long) = _recordsRead += value
- private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
+ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
+
+ private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
+ private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
+ private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
+ private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
+ private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
+ private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
+
+ private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
+ private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
+ private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
+ private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
+ private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
+ private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 24795f8600..c7aaabb561 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -17,40 +17,66 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Metrics pertaining to shuffle data written in a given task.
+ * A collection of accumulators that represent metrics about writing shuffle data.
+ * Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleWriteMetrics extends Serializable {
+class ShuffleWriteMetrics private (
+ _bytesWritten: Accumulator[Long],
+ _recordsWritten: Accumulator[Long],
+ _writeTime: Accumulator[Long])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME))
+ }
/**
- * Number of bytes written for the shuffle by this task
+ * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in
+ * many places only to merge their values together later. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerShuffleWriteMetrics]].
*/
- @volatile private var _bytesWritten: Long = _
- def bytesWritten: Long = _bytesWritten
- private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
- private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value
+ private[spark] def this() {
+ this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap)
+ }
/**
- * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
+ * Number of bytes written for the shuffle by this task.
*/
- @volatile private var _writeTime: Long = _
- def writeTime: Long = _writeTime
- private[spark] def incWriteTime(value: Long) = _writeTime += value
- private[spark] def decWriteTime(value: Long) = _writeTime -= value
+ def bytesWritten: Long = _bytesWritten.localValue
/**
- * Total number of records written to the shuffle by this task
+ * Total number of records written to the shuffle by this task.
*/
- @volatile private var _recordsWritten: Long = _
- def recordsWritten: Long = _recordsWritten
- private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
- private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
- private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
+ def recordsWritten: Long = _recordsWritten.localValue
+
+ /**
+ * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds.
+ */
+ def writeTime: Long = _writeTime.localValue
+
+ private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
+ private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
+ private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
+ private[spark] def decBytesWritten(v: Long): Unit = {
+ _bytesWritten.setValue(bytesWritten - v)
+ }
+ private[spark] def decRecordsWritten(v: Long): Unit = {
+ _recordsWritten.setValue(recordsWritten - v)
+ }
// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 32ef5a9b56..8d10bf588e 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,90 +17,161 @@
package org.apache.spark.executor
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentHashMap
-
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.DataReadMethod.DataReadMethod
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
-import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Metrics tracked during the execution of a task.
*
- * This class is used to house metrics both for in-progress and completed tasks. In executors,
- * both the task thread and the heartbeat thread write to the TaskMetrics. The heartbeat thread
- * reads it to send in-progress metrics, and the task thread reads it to send metrics along with
- * the completed task.
+ * This class is wrapper around a collection of internal accumulators that represent metrics
+ * associated with a task. The local values of these accumulators are sent from the executor
+ * to the driver when the task completes. These values are then merged into the corresponding
+ * accumulator previously registered on the driver.
+ *
+ * The accumulator updates are also sent to the driver periodically (on executor heartbeat)
+ * and when the task failed with an exception. The [[TaskMetrics]] object itself should never
+ * be sent to the driver.
*
- * So, when adding new fields, take into consideration that the whole object can be serialized for
- * shipping off at any time to consumers of the SparkListener interface.
+ * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on.
+ * Each accumulator in this initial set must be uniquely named and marked
+ * as internal. Additional accumulators registered later need not satisfy
+ * these requirements.
*/
@DeveloperApi
-class TaskMetrics extends Serializable {
+class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
+
+ import InternalAccumulator._
+
+ // Needed for Java tests
+ def this() {
+ this(InternalAccumulator.create())
+ }
+
+ /**
+ * All accumulators registered with this task.
+ */
+ private val accums = new ArrayBuffer[Accumulable[_, _]]
+ accums ++= initialAccums
+
+ /**
+ * A map for quickly accessing the initial set of accumulators by name.
+ */
+ private val initialAccumsMap: Map[String, Accumulator[_]] = {
+ val map = new mutable.HashMap[String, Accumulator[_]]
+ initialAccums.foreach { a =>
+ val name = a.name.getOrElse {
+ throw new IllegalArgumentException(
+ "initial accumulators passed to TaskMetrics must be named")
+ }
+ require(a.isInternal,
+ s"initial accumulator '$name' passed to TaskMetrics must be marked as internal")
+ require(!map.contains(name),
+ s"detected duplicate accumulator name '$name' when constructing TaskMetrics")
+ map(name) = a
+ }
+ map.toMap
+ }
+
+ // Each metric is internally represented as an accumulator
+ private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME)
+ private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME)
+ private val _resultSize = getAccum(RESULT_SIZE)
+ private val _jvmGCTime = getAccum(JVM_GC_TIME)
+ private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME)
+ private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED)
+ private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED)
+ private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY)
+ private val _updatedBlockStatuses =
+ TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES)
+
/**
- * Host's name the task runs on
+ * Time taken on the executor to deserialize this task.
*/
- private var _hostname: String = _
- def hostname: String = _hostname
- private[spark] def setHostname(value: String) = _hostname = value
+ def executorDeserializeTime: Long = _executorDeserializeTime.localValue
/**
- * Time taken on the executor to deserialize this task
+ * Time the executor spends actually running the task (including fetching shuffle data).
*/
- private var _executorDeserializeTime: Long = _
- def executorDeserializeTime: Long = _executorDeserializeTime
- private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
+ def executorRunTime: Long = _executorRunTime.localValue
+ /**
+ * The number of bytes this task transmitted back to the driver as the TaskResult.
+ */
+ def resultSize: Long = _resultSize.localValue
/**
- * Time the executor spends actually running the task (including fetching shuffle data)
+ * Amount of time the JVM spent in garbage collection while executing this task.
*/
- private var _executorRunTime: Long = _
- def executorRunTime: Long = _executorRunTime
- private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
+ def jvmGCTime: Long = _jvmGCTime.localValue
/**
- * The number of bytes this task transmitted back to the driver as the TaskResult
+ * Amount of time spent serializing the task result.
*/
- private var _resultSize: Long = _
- def resultSize: Long = _resultSize
- private[spark] def setResultSize(value: Long) = _resultSize = value
+ def resultSerializationTime: Long = _resultSerializationTime.localValue
+ /**
+ * The number of in-memory bytes spilled by this task.
+ */
+ def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue
/**
- * Amount of time the JVM spent in garbage collection while executing this task
+ * The number of on-disk bytes spilled by this task.
*/
- private var _jvmGCTime: Long = _
- def jvmGCTime: Long = _jvmGCTime
- private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
+ def diskBytesSpilled: Long = _diskBytesSpilled.localValue
/**
- * Amount of time spent serializing the task result
+ * Peak memory used by internal data structures created during shuffles, aggregations and
+ * joins. The value of this accumulator should be approximately the sum of the peak sizes
+ * across all such data structures created in this task. For SQL jobs, this only tracks all
+ * unsafe operators and ExternalSort.
*/
- private var _resultSerializationTime: Long = _
- def resultSerializationTime: Long = _resultSerializationTime
- private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value
+ def peakExecutionMemory: Long = _peakExecutionMemory.localValue
/**
- * The number of in-memory bytes spilled by this task
+ * Storage statuses of any blocks that have been updated as a result of this task.
*/
- private var _memoryBytesSpilled: Long = _
- def memoryBytesSpilled: Long = _memoryBytesSpilled
- private[spark] def incMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled += value
- private[spark] def decMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled -= value
+ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue
+
+ @deprecated("use updatedBlockStatuses instead", "2.0.0")
+ def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
+ if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
+ }
+
+ // Setters and increment-ers
+ private[spark] def setExecutorDeserializeTime(v: Long): Unit =
+ _executorDeserializeTime.setValue(v)
+ private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v)
+ private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
+ private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
+ private[spark] def setResultSerializationTime(v: Long): Unit =
+ _resultSerializationTime.setValue(v)
+ private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
+ private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
+ private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
+ private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
+ _updatedBlockStatuses.add(v)
+ private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
+ _updatedBlockStatuses.setValue(v)
/**
- * The number of on-disk bytes spilled by this task
+ * Get a Long accumulator from the given map by name, assuming it exists.
+ * Note: this only searches the initial set of accumulators passed into the constructor.
*/
- private var _diskBytesSpilled: Long = _
- def diskBytesSpilled: Long = _diskBytesSpilled
- private[spark] def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value
- private[spark] def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value
+ private[spark] def getAccum(name: String): Accumulator[Long] = {
+ TaskMetrics.getAccum[Long](initialAccumsMap, name)
+ }
+
+
+ /* ========================== *
+ | INPUT METRICS |
+ * ========================== */
private var _inputMetrics: Option[InputMetrics] = None
@@ -116,7 +187,8 @@ class TaskMetrics extends Serializable {
private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = {
synchronized {
val metrics = _inputMetrics.getOrElse {
- val metrics = new InputMetrics(readMethod)
+ val metrics = new InputMetrics(initialAccumsMap)
+ metrics.setReadMethod(readMethod)
_inputMetrics = Some(metrics)
metrics
}
@@ -128,18 +200,17 @@ class TaskMetrics extends Serializable {
if (metrics.readMethod == readMethod) {
metrics
} else {
- new InputMetrics(readMethod)
+ val m = new InputMetrics
+ m.setReadMethod(readMethod)
+ m
}
}
}
- /**
- * This should only be used when recreating TaskMetrics, not when updating input metrics in
- * executors
- */
- private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
- _inputMetrics = inputMetrics
- }
+
+ /* ============================ *
+ | OUTPUT METRICS |
+ * ============================ */
private var _outputMetrics: Option[OutputMetrics] = None
@@ -149,23 +220,24 @@ class TaskMetrics extends Serializable {
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics
- @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
- def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
- _outputMetrics = om
- }
-
/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
private[spark] def registerOutputMetrics(
writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized {
_outputMetrics.getOrElse {
- val metrics = new OutputMetrics(writeMethod)
+ val metrics = new OutputMetrics(initialAccumsMap)
+ metrics.setWriteMethod(writeMethod)
_outputMetrics = Some(metrics)
metrics
}
}
+
+ /* ================================== *
+ | SHUFFLE READ METRICS |
+ * ================================== */
+
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
/**
@@ -175,20 +247,12 @@ class TaskMetrics extends Serializable {
def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics
/**
- * This should only be used when recreating TaskMetrics, not when updating read metrics in
- * executors.
- */
- private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) {
- _shuffleReadMetrics = shuffleReadMetrics
- }
-
- /**
* Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
*
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
* issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
* each dependency and merge these metrics before reporting them to the driver.
- */
+ */
@transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]
/**
@@ -210,19 +274,21 @@ class TaskMetrics extends Serializable {
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
- val merged = new ShuffleReadMetrics
- for (depMetrics <- tempShuffleReadMetrics) {
- merged.incFetchWaitTime(depMetrics.fetchWaitTime)
- merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
- merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
- merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
- merged.incLocalBytesRead(depMetrics.localBytesRead)
- merged.incRecordsRead(depMetrics.recordsRead)
- }
- _shuffleReadMetrics = Some(merged)
+ val metrics = new ShuffleReadMetrics(initialAccumsMap)
+ metrics.setRemoteBlocksFetched(tempShuffleReadMetrics.map(_.remoteBlocksFetched).sum)
+ metrics.setLocalBlocksFetched(tempShuffleReadMetrics.map(_.localBlocksFetched).sum)
+ metrics.setFetchWaitTime(tempShuffleReadMetrics.map(_.fetchWaitTime).sum)
+ metrics.setRemoteBytesRead(tempShuffleReadMetrics.map(_.remoteBytesRead).sum)
+ metrics.setLocalBytesRead(tempShuffleReadMetrics.map(_.localBytesRead).sum)
+ metrics.setRecordsRead(tempShuffleReadMetrics.map(_.recordsRead).sum)
+ _shuffleReadMetrics = Some(metrics)
}
}
+ /* =================================== *
+ | SHUFFLE WRITE METRICS |
+ * =================================== */
+
private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
/**
@@ -230,86 +296,120 @@ class TaskMetrics extends Serializable {
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
- @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
- def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
- _shuffleWriteMetrics = swm
- }
-
/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized {
_shuffleWriteMetrics.getOrElse {
- val metrics = new ShuffleWriteMetrics
+ val metrics = new ShuffleWriteMetrics(initialAccumsMap)
_shuffleWriteMetrics = Some(metrics)
metrics
}
}
- private var _updatedBlockStatuses: Seq[(BlockId, BlockStatus)] =
- Seq.empty[(BlockId, BlockStatus)]
- /**
- * Storage statuses of any blocks that have been updated as a result of this task.
- */
- def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses
+ /* ========================== *
+ | OTHER THINGS |
+ * ========================== */
- @deprecated("setting updated blocks is for internal use only", "2.0.0")
- def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
- _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)])
+ private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ accums += a
}
- private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
- _updatedBlockStatuses ++= v
+ /**
+ * Return the latest updates of accumulators in this task.
+ *
+ * The [[AccumulableInfo.update]] field is always defined and the [[AccumulableInfo.value]]
+ * field is always empty, since this represents the partial updates recorded in this task,
+ * not the aggregated value across multiple tasks.
+ */
+ def accumulatorUpdates(): Seq[AccumulableInfo] = accums.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(a.localValue), None, a.isInternal, a.countFailedValues)
}
- private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
- _updatedBlockStatuses = v
+ // If we are reconstructing this TaskMetrics on the driver, some metrics may already be set.
+ // If so, initialize all relevant metrics classes so listeners can access them downstream.
+ {
+ var (hasShuffleRead, hasShuffleWrite, hasInput, hasOutput) = (false, false, false, false)
+ initialAccums
+ .filter { a => a.localValue != a.zero }
+ .foreach { a =>
+ a.name.get match {
+ case sr if sr.startsWith(SHUFFLE_READ_METRICS_PREFIX) => hasShuffleRead = true
+ case sw if sw.startsWith(SHUFFLE_WRITE_METRICS_PREFIX) => hasShuffleWrite = true
+ case in if in.startsWith(INPUT_METRICS_PREFIX) => hasInput = true
+ case out if out.startsWith(OUTPUT_METRICS_PREFIX) => hasOutput = true
+ case _ =>
+ }
+ }
+ if (hasShuffleRead) { _shuffleReadMetrics = Some(new ShuffleReadMetrics(initialAccumsMap)) }
+ if (hasShuffleWrite) { _shuffleWriteMetrics = Some(new ShuffleWriteMetrics(initialAccumsMap)) }
+ if (hasInput) { _inputMetrics = Some(new InputMetrics(initialAccumsMap)) }
+ if (hasOutput) { _outputMetrics = Some(new OutputMetrics(initialAccumsMap)) }
}
- @deprecated("use updatedBlockStatuses instead", "2.0.0")
- def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
- if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None
- }
+}
- private[spark] def updateInputMetrics(): Unit = synchronized {
- inputMetrics.foreach(_.updateBytesRead())
- }
+private[spark] object TaskMetrics extends Logging {
- @throws(classOf[IOException])
- private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
- in.defaultReadObject()
- // Get the hostname from cached data, since hostname is the order of number of nodes in
- // cluster, so using cached hostname will decrease the object number and alleviate the GC
- // overhead.
- _hostname = TaskMetrics.getCachedHostName(_hostname)
- }
-
- private var _accumulatorUpdates: Map[Long, Any] = Map.empty
- @transient private var _accumulatorsUpdater: () => Map[Long, Any] = null
+ def empty: TaskMetrics = new TaskMetrics
- private[spark] def updateAccumulators(): Unit = synchronized {
- _accumulatorUpdates = _accumulatorsUpdater()
+ /**
+ * Get an accumulator from the given map by name, assuming it exists.
+ */
+ def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = {
+ require(accumMap.contains(name), s"metric '$name' is missing")
+ val accum = accumMap(name)
+ try {
+ // Note: we can't do pattern matching here because types are erased by compile time
+ accum.asInstanceOf[Accumulator[T]]
+ } catch {
+ case e: ClassCastException =>
+ throw new SparkException(s"accumulator $name was of unexpected type", e)
+ }
}
/**
- * Return the latest updates of accumulators in this task.
+ * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.
+ *
+ * Executors only send accumulator updates back to the driver, not [[TaskMetrics]]. However, we
+ * need the latter to post task end events to listeners, so we need to reconstruct the metrics
+ * on the driver.
+ *
+ * This assumes the provided updates contain the initial set of accumulators representing
+ * internal task level metrics.
*/
- def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates
-
- private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = {
- _accumulatorsUpdater = accumulatorsUpdater
+ def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
+ // Initial accumulators are passed into the TaskMetrics constructor first because these
+ // are required to be uniquely named. The rest of the accumulators from this task are
+ // registered later because they need not satisfy this requirement.
+ val (initialAccumInfos, otherAccumInfos) = accumUpdates
+ .filter { info => info.update.isDefined }
+ .partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
+ val initialAccums = initialAccumInfos.map { info =>
+ val accum = InternalAccumulator.create(info.name.get)
+ accum.setValueAny(info.update.get)
+ accum
+ }
+ // We don't know the types of the rest of the accumulators, so we try to find the same ones
+ // that were previously registered here on the driver and make copies of them. It is important
+ // that we copy the accumulators here since they are used across many tasks and we want to
+ // maintain a snapshot of their local task values when we post them to listeners downstream.
+ val otherAccums = otherAccumInfos.flatMap { info =>
+ val id = info.id
+ val acc = Accumulators.get(id).map { a =>
+ val newAcc = a.copy()
+ newAcc.setValueAny(info.update.get)
+ newAcc
+ }
+ if (acc.isEmpty) {
+ logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.")
+ }
+ acc
+ }
+ val metrics = new TaskMetrics(initialAccums)
+ otherAccums.foreach(metrics.registerAccumulator)
+ metrics
}
-}
-
-private[spark] object TaskMetrics {
- private val hostNameCache = new ConcurrentHashMap[String, String]()
-
- def empty: TaskMetrics = new TaskMetrics
-
- def getCachedHostName(host: String): String = {
- val canonicalHost = hostNameCache.putIfAbsent(host, host)
- if (canonicalHost != null) canonicalHost else host
- }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 3587e7eb1a..d9b0824b38 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -153,8 +153,7 @@ class CoGroupedRDD[K: ClassTag](
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a79ab86d49..3204e6adce 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -212,6 +212,8 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
+ // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
+
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
// Sets the thread local variable for the file's name
@@ -222,14 +224,17 @@ class HadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
- split.inputSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
+ val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+
+ def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(getBytesRead())
}
}
- inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
@@ -252,6 +257,9 @@ class HadoopRDD[K, V](
if (!finished) {
inputMetrics.incRecordsRead(1)
}
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
(key, value)
}
@@ -272,8 +280,8 @@ class HadoopRDD[K, V](
} finally {
reader = null
}
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
+ if (getBytesReadCallback.isDefined) {
+ updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 5cc9c81cc6..4d2816e335 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -133,14 +133,17 @@ class NewHadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
- split.serializableHadoopSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
+ val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+
+ def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(getBytesRead())
}
}
- inputMetrics.setBytesReadCallback(bytesReadCallback)
val format = inputFormatClass.newInstance
format match {
@@ -182,6 +185,9 @@ class NewHadoopRDD[K, V](
if (!finished) {
inputMetrics.incRecordsRead(1)
}
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
(reader.getCurrentKey, reader.getCurrentValue)
}
@@ -201,8 +207,8 @@ class NewHadoopRDD[K, V](
} finally {
reader = null
}
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
+ if (getBytesReadCallback.isDefined) {
+ updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index 146cfb9ba8..9d45fff921 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -19,47 +19,58 @@ package org.apache.spark.scheduler
import org.apache.spark.annotation.DeveloperApi
+
/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
+ *
+ * Note: once this is JSON serialized the types of `update` and `value` will be lost and be
+ * cast to strings. This is because the user can define an accumulator of any type and it will
+ * be difficult to preserve the type in consumers of the event log. This does not apply to
+ * internal accumulators that represent task level metrics.
+ *
+ * @param id accumulator ID
+ * @param name accumulator name
+ * @param update partial value from a task, may be None if used on driver to describe a stage
+ * @param value total accumulated value so far, maybe None if used on executors to describe a task
+ * @param internal whether this accumulator was internal
+ * @param countFailedValues whether to count this accumulator's partial value if the task failed
*/
@DeveloperApi
-class AccumulableInfo private[spark] (
- val id: Long,
- val name: String,
- val update: Option[String], // represents a partial update within a task
- val value: String,
- val internal: Boolean) {
-
- override def equals(other: Any): Boolean = other match {
- case acc: AccumulableInfo =>
- this.id == acc.id && this.name == acc.name &&
- this.update == acc.update && this.value == acc.value &&
- this.internal == acc.internal
- case _ => false
- }
+case class AccumulableInfo private[spark] (
+ id: Long,
+ name: Option[String],
+ update: Option[Any], // represents a partial update within a task
+ value: Option[Any],
+ private[spark] val internal: Boolean,
+ private[spark] val countFailedValues: Boolean)
- override def hashCode(): Int = {
- val state = Seq(id, name, update, value, internal)
- state.map(_.hashCode).reduceLeft(31 * _ + _)
- }
-}
+/**
+ * A collection of deprecated constructors. This will be removed soon.
+ */
object AccumulableInfo {
+
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(
id: Long,
name: String,
update: Option[String],
value: String,
internal: Boolean): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal)
+ new AccumulableInfo(
+ id, Option(name), update, Option(value), internal, countFailedValues = false)
}
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal = false)
+ new AccumulableInfo(
+ id, Option(name), update, Option(value), internal = false, countFailedValues = false)
}
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, None, value, internal = false)
+ new AccumulableInfo(
+ id, Option(name), None, Option(value), internal = false, countFailedValues = false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6b01a10fc1..897479b500 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -208,11 +208,10 @@ class DAGScheduler(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics): Unit = {
+ accumUpdates: Seq[AccumulableInfo],
+ taskInfo: TaskInfo): Unit = {
eventProcessLoop.post(
- CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
+ CompletionEvent(task, reason, result, accumUpdates, taskInfo))
}
/**
@@ -222,9 +221,10 @@ class DAGScheduler(
*/
def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
+ // (taskId, stageId, stageAttemptId, accumUpdates)
+ accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
- listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
+ listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
@@ -1074,39 +1074,43 @@ class DAGScheduler(
}
}
- /** Merge updates from a task to our local accumulator values */
+ /**
+ * Merge local values from a task into the corresponding accumulators previously registered
+ * here on the driver.
+ *
+ * Although accumulators themselves are not thread-safe, this method is called only from one
+ * thread, the one that runs the scheduling loop. This means we only handle one task
+ * completion event at a time so we don't need to worry about locking the accumulators.
+ * This still doesn't stop the caller from updating the accumulator outside the scheduler,
+ * but that's not our problem since there's nothing we can do about that.
+ */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
- if (event.accumUpdates != null) {
- try {
- Accumulators.add(event.accumUpdates)
-
- event.accumUpdates.foreach { case (id, partialValue) =>
- // In this instance, although the reference in Accumulators.originals is a WeakRef,
- // it's guaranteed to exist since the event.accumUpdates Map exists
-
- val acc = Accumulators.originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
- case None => throw new NullPointerException("Non-existent reference to Accumulator")
- }
-
- // To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && partialValue != acc.zero) {
- val name = acc.name.get
- val value = s"${acc.value}"
- stage.latestInfo.accumulables(id) =
- new AccumulableInfo(id, name, None, value, acc.isInternal)
- event.taskInfo.accumulables +=
- new AccumulableInfo(id, name, Some(s"$partialValue"), value, acc.isInternal)
- }
+ try {
+ event.accumUpdates.foreach { ainfo =>
+ assert(ainfo.update.isDefined, "accumulator from task should have a partial value")
+ val id = ainfo.id
+ val partialValue = ainfo.update.get
+ // Find the corresponding accumulator on the driver and update it
+ val acc: Accumulable[Any, Any] = Accumulators.get(id) match {
+ case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
+ case None =>
+ throw new SparkException(s"attempted to access non-existent accumulator $id")
+ }
+ acc ++= partialValue
+ // To avoid UI cruft, ignore cases where value wasn't updated
+ if (acc.name.isDefined && partialValue != acc.zero) {
+ val name = acc.name
+ stage.latestInfo.accumulables(id) = new AccumulableInfo(
+ id, name, None, Some(acc.value), acc.isInternal, acc.countFailedValues)
+ event.taskInfo.accumulables += new AccumulableInfo(
+ id, name, Some(partialValue), Some(acc.value), acc.isInternal, acc.countFailedValues)
}
- } catch {
- // If we see an exception during accumulator update, just log the
- // error and move on.
- case e: Exception =>
- logError(s"Failed to update accumulators for $task", e)
}
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Failed to update accumulators for task ${task.partitionId}", e)
}
}
@@ -1116,6 +1120,7 @@ class DAGScheduler(
*/
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
+ val taskId = event.taskInfo.id
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
@@ -1125,12 +1130,26 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
- // The success case is dealt with separately below, since we need to compute accumulator
- // updates before posting.
+ // Reconstruct task metrics. Note: this may be null if the task has failed.
+ val taskMetrics: TaskMetrics =
+ if (event.accumUpdates.nonEmpty) {
+ try {
+ TaskMetrics.fromAccumulatorUpdates(event.accumUpdates)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
+ null
+ }
+ } else {
+ null
+ }
+
+ // The success case is dealt with separately below.
+ // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
- listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
- event.taskInfo, event.taskMetrics))
+ listenerBus.post(SparkListenerTaskEnd(
+ stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
if (!stageIdToStage.contains(task.stageId)) {
@@ -1142,7 +1161,7 @@ class DAGScheduler(
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
- event.reason, event.taskInfo, event.taskMetrics))
+ event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
@@ -1291,7 +1310,8 @@ class DAGScheduler(
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
case exceptionFailure: ExceptionFailure =>
- // Do nothing here, left up to the TaskScheduler to decide how to handle user failures
+ // Tasks failed with exceptions might still have accumulator updates.
+ updateAccumulators(event)
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
@@ -1637,7 +1657,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
- case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
+ case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index dda3b6cc7f..d5cd2da7a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,9 +73,8 @@ private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics)
+ accumUpdates: Seq[AccumulableInfo],
+ taskInfo: TaskInfo)
extends DAGSchedulerEvent
private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 6590cf6ffd..885f70e89f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
* See [[Task]] for more information.
*
* @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
* @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
* partition of the given RDD. Once deserialized, the type should be
* (RDD[T], (TaskContext, Iterator[T]) => U).
@@ -37,6 +38,9 @@ import org.apache.spark.rdd.RDD
* @param locs preferred task execution locations for locality scheduling
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
+ * @param _initialAccums initial set of accumulators to be used in this task for tracking
+ * internal metrics. Other accumulators will be registered later when
+ * they are deserialized on the executors.
*/
private[spark] class ResultTask[T, U](
stageId: Int,
@@ -45,8 +49,8 @@ private[spark] class ResultTask[T, U](
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
- internalAccumulators: Seq[Accumulator[Long]])
- extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
+ _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
+ extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ea97ef0e74..89207dd175 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -33,10 +33,14 @@ import org.apache.spark.shuffle.ShuffleWriter
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
* @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized,
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
+ * @param _initialAccums initial set of accumulators to be used in this task for tracking
+ * internal metrics. Other accumulators will be registered later when
+ * they are deserialized on the executors.
*/
private[spark] class ShuffleMapTask(
stageId: Int,
@@ -44,8 +48,8 @@ private[spark] class ShuffleMapTask(
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
- internalAccumulators: Seq[Accumulator[Long]])
- extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators)
+ _initialAccums: Seq[Accumulator[_]])
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 6c6883d703..ed3adbd81c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.util.Properties
+import javax.annotation.Nullable
import scala.collection.Map
import scala.collection.mutable
@@ -60,7 +61,7 @@ case class SparkListenerTaskEnd(
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
- taskMetrics: TaskMetrics)
+ @Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent
@DeveloperApi
@@ -111,12 +112,12 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
/**
* Periodic updates from executors.
* @param execId executor id
- * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
+ * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
- taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
+ accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
extends SparkListenerEvent
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 7ea24a217b..c1c8b47128 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -74,10 +74,10 @@ private[scheduler] abstract class Stage(
val name: String = callSite.shortForm
val details: String = callSite.longForm
- private var _internalAccumulators: Seq[Accumulator[Long]] = Seq.empty
+ private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty
/** Internal accumulators shared across all tasks in this stage. */
- def internalAccumulators: Seq[Accumulator[Long]] = _internalAccumulators
+ def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators
/**
* Re-initialize the internal accumulators associated with this stage.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index fca57928ec..a49f3716e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
@@ -41,32 +41,29 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
* and divides the task output to multiple buckets (based on the task's partitioner).
*
* @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
* @param partitionId index of the number in the RDD
+ * @param initialAccumulators initial set of accumulators to be used in this task for tracking
+ * internal metrics. Other accumulators will be registered later when
+ * they are deserialized on the executors.
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- internalAccumulators: Seq[Accumulator[Long]]) extends Serializable {
+ val initialAccumulators: Seq[Accumulator[_]]) extends Serializable {
/**
- * The key of the Map is the accumulator id and the value of the Map is the latest accumulator
- * local value.
- */
- type AccumulatorUpdates = Map[Long, Any]
-
- /**
- * Called by [[Executor]] to run this task.
+ * Called by [[org.apache.spark.executor.Executor]] to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task along with updates of Accumulators.
*/
final def run(
- taskAttemptId: Long,
- attemptNumber: Int,
- metricsSystem: MetricsSystem)
- : (T, AccumulatorUpdates) = {
+ taskAttemptId: Long,
+ attemptNumber: Int,
+ metricsSystem: MetricsSystem): T = {
context = new TaskContextImpl(
stageId,
partitionId,
@@ -74,16 +71,14 @@ private[spark] abstract class Task[T](
attemptNumber,
taskMemoryManager,
metricsSystem,
- internalAccumulators)
+ initialAccumulators)
TaskContext.setTaskContext(context)
- context.taskMetrics.setHostname(Utils.localHostName())
- context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
- (runTask(context), context.collectAccumulators())
+ runTask(context)
} finally {
context.markTaskCompleted()
try {
@@ -141,6 +136,18 @@ private[spark] abstract class Task[T](
def executorDeserializeTime: Long = _executorDeserializeTime
/**
+ * Collect the latest values of accumulators used in this task. If the task failed,
+ * filter out the accumulators whose values should not be included on failures.
+ */
+ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo] = {
+ if (context != null) {
+ context.taskMetrics.accumulatorUpdates().filter { a => !taskFailed || a.countFailedValues }
+ } else {
+ Seq.empty[AccumulableInfo]
+ }
+ }
+
+ /**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
* be called multiple times.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index b82c7f3fa5..03135e63d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -20,11 +20,9 @@ package org.apache.spark.scheduler
import java.io._
import java.nio.ByteBuffer
-import scala.collection.Map
-import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkEnv
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockId
import org.apache.spark.util.Utils
@@ -36,31 +34,24 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int)
extends TaskResult[T] with Serializable
/** A TaskResult that contains the task's return value and accumulator updates. */
-private[spark]
-class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any],
- var metrics: TaskMetrics)
+private[spark] class DirectTaskResult[T](
+ var valueBytes: ByteBuffer,
+ var accumUpdates: Seq[AccumulableInfo])
extends TaskResult[T] with Externalizable {
private var valueObjectDeserialized = false
private var valueObject: T = _
- def this() = this(null.asInstanceOf[ByteBuffer], null, null)
+ def this() = this(null.asInstanceOf[ByteBuffer], null)
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
-
- out.writeInt(valueBytes.remaining);
+ out.writeInt(valueBytes.remaining)
Utils.writeByteBuffer(valueBytes, out)
-
out.writeInt(accumUpdates.size)
- for ((key, value) <- accumUpdates) {
- out.writeLong(key)
- out.writeObject(value)
- }
- out.writeObject(metrics)
+ accumUpdates.foreach(out.writeObject)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
-
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
@@ -70,13 +61,12 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
if (numUpdates == 0) {
accumUpdates = null
} else {
- val _accumUpdates = mutable.Map[Long, Any]()
+ val _accumUpdates = new ArrayBuffer[AccumulableInfo]
for (i <- 0 until numUpdates) {
- _accumUpdates(in.readLong()) = in.readObject()
+ _accumUpdates += in.readObject.asInstanceOf[AccumulableInfo]
}
accumUpdates = _accumUpdates
}
- metrics = in.readObject().asInstanceOf[TaskMetrics]
valueObjectDeserialized = false
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index f4965994d8..c94c4f55e9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import java.nio.ByteBuffer
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{ExecutorService, RejectedExecutionException}
import scala.language.existentials
import scala.util.control.NonFatal
@@ -35,9 +35,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
extends Logging {
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
- private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
- THREADS, "task-result-getter")
+ // Exposed for testing.
+ protected val getTaskResultExecutor: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")
+
+ // Exposed for testing.
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
sparkEnv.closureSerializer.newInstance()
@@ -45,7 +48,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
def enqueueSuccessfulTask(
- taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ serializedData: ByteBuffer): Unit = {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
@@ -82,7 +87,19 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
(deserializedResult, size)
}
- result.metrics.setResultSize(size)
+ // Set the task result size in the accumulator updates received from the executors.
+ // We need to do this here on the driver because if we did this on the executors then
+ // we would have to serialize the result again after updating the size.
+ result.accumUpdates = result.accumUpdates.map { a =>
+ if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
+ assert(a.update == Some(0L),
+ "task result size should not have been set on the executors")
+ a.copy(update = Some(size.toLong))
+ } else {
+ a
+ }
+ }
+
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 7c0b007db7..fccd6e0699 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -65,8 +65,10 @@ private[spark] trait TaskScheduler {
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
- def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean
+ def executorHeartbeatReceived(
+ execId: String,
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ blockManagerId: BlockManagerId): Boolean
/**
* Get an application ID associated with the job.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6e3ef0e54f..29341dfe30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -30,7 +30,6 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
@@ -380,17 +379,17 @@ private[spark] class TaskSchedulerImpl(
*/
override def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
-
- val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
- taskMetrics.flatMap { case (id, metrics) =>
+ // (taskId, stageId, stageAttemptId, accumUpdates)
+ val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
+ accumUpdates.flatMap { case (id, updates) =>
taskIdToTaskSetManager.get(id).map { taskSetMgr =>
- (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics)
+ (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates)
}
}
}
- dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
+ dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
}
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index aa39b59d8c..cf97877476 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -621,8 +621,7 @@ private[spark] class TaskSetManager(
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
- sched.dagScheduler.taskEnded(
- tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
+ sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
@@ -653,8 +652,7 @@ private[spark] class TaskSetManager(
info.markFailed()
val index = info.index
copiesRunning(index) -= 1
- var taskMetrics : TaskMetrics = null
-
+ var accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
reason.asInstanceOf[TaskFailedReason].toErrorString
val failureException: Option[Throwable] = reason match {
@@ -669,7 +667,8 @@ private[spark] class TaskSetManager(
None
case ef: ExceptionFailure =>
- taskMetrics = ef.metrics.orNull
+ // ExceptionFailure's might have accumulator updates
+ accumUpdates = ef.accumUpdates
if (ef.className == classOf[NotSerializableException].getName) {
// If the task result wasn't serializable, there's no point in trying to re-execute it.
logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
@@ -721,7 +720,7 @@ private[spark] class TaskSetManager(
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTimeMillis())
- sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
+ sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
@@ -793,7 +792,8 @@ private[spark] class TaskSetManager(
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
- sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
+ sched.dagScheduler.taskEnded(
+ tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index a57e5b0bfb..acbe16001f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -103,8 +103,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 078718ba11..9c92a50150 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -237,7 +237,8 @@ private[v1] object AllStagesResource {
}
def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
- new AccumulableInfo(acc.id, acc.name, acc.update, acc.value)
+ new AccumulableInfo(
+ acc.id, acc.name.orNull, acc.update.map(_.toString), acc.value.map(_.toString).orNull)
}
def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 4a9f8b3052..b2aa8bfbe7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -325,12 +325,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
+ val metrics = new TaskMetrics
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
- stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
+ stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics)))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@@ -387,9 +388,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
(Some(e.toErrorString), None)
}
- if (!metrics.isEmpty) {
+ metrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
- updateAggregateMetrics(stageData, info.executorId, metrics.get, oldMetrics)
+ updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
@@ -489,19 +490,18 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
- for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
+ for ((taskId, sid, sAttempt, accumUpdates) <- executorMetricsUpdate.accumUpdates) {
val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
logWarning("Metrics update for task in unknown stage " + sid)
new StageUIData
})
val taskData = stageData.taskData.get(taskId)
- taskData.map { t =>
+ val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
+ taskData.foreach { t =>
if (!t.taskInfo.finished) {
- updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics,
- t.taskMetrics)
-
+ updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics)
// Overwrite task metrics
- t.taskMetrics = Some(taskMetrics)
+ t.taskMetrics = Some(metrics)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 914f6183cc..29c5ff0b5c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -271,8 +271,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
- def accumulableRow(acc: AccumulableInfo): Elem =
- <tr><td>{acc.name}</td><td>{acc.value}</td></tr>
+ def accumulableRow(acc: AccumulableInfo): Seq[Node] = {
+ (acc.name, acc.value) match {
+ case (Some(name), Some(value)) => <tr><td>{name}</td><td>{value}</td></tr>
+ case _ => Seq.empty[Node]
+ }
+ }
val accumulableTable = UIUtils.listingTable(
accumulableHeaders,
accumulableRow,
@@ -404,13 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
- val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
- info.accumulables
- .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
- .map { acc => acc.update.getOrElse("0").toLong }
- .getOrElse(0L)
- .toDouble
- }
+ val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.peakExecutionMemory.toDouble
+ }
val peakExecutionMemoryQuantiles = {
<td>
<span data-toggle="tooltip"
@@ -891,15 +891,15 @@ private[ui] class TaskDataSource(
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = getGettingResultTime(info, currentTime)
- val (taskInternalAccumulables, taskExternalAccumulables) =
- info.accumulables.partition(_.internal)
- val externalAccumulableReadable = taskExternalAccumulables.map { acc =>
- StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
- }
- val peakExecutionMemoryUsed = taskInternalAccumulables
- .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
- .map { acc => acc.update.getOrElse("0").toLong }
- .getOrElse(0L)
+ val externalAccumulableReadable = info.accumulables
+ .filterNot(_.internal)
+ .flatMap { a =>
+ (a.name, a.update) match {
+ case (Some(name), Some(update)) => Some(StringEscapeUtils.escapeHtml4(s"$name: $update"))
+ case _ => None
+ }
+ }
+ val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L)
val maybeInput = metrics.flatMap(_.inputMetrics)
val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index efa22b9993..dc8070cf8a 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -233,14 +233,14 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
val execId = metricsUpdate.execId
- val taskMetrics = metricsUpdate.taskMetrics
+ val accumUpdates = metricsUpdate.accumUpdates
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Executor ID" -> execId) ~
- ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) =>
+ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
("Stage Attempt ID" -> stageAttemptId) ~
- ("Task Metrics" -> taskMetricsToJson(metrics))
+ ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
})
}
@@ -265,7 +265,7 @@ private[spark] object JsonProtocol {
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
("Accumulables" -> JArray(
- stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
+ stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
}
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -284,11 +284,44 @@ private[spark] object JsonProtocol {
}
def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
+ val name = accumulableInfo.name
("ID" -> accumulableInfo.id) ~
- ("Name" -> accumulableInfo.name) ~
- ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~
- ("Value" -> accumulableInfo.value) ~
- ("Internal" -> accumulableInfo.internal)
+ ("Name" -> name) ~
+ ("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~
+ ("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~
+ ("Internal" -> accumulableInfo.internal) ~
+ ("Count Failed Values" -> accumulableInfo.countFailedValues)
+ }
+
+ /**
+ * Serialize the value of an accumulator to JSON.
+ *
+ * For accumulators representing internal task metrics, this looks up the relevant
+ * [[AccumulatorParam]] to serialize the value accordingly. For all other accumulators,
+ * this will simply serialize the value as a string.
+ *
+ * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
+ */
+ private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
+ import AccumulatorParam._
+ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
+ (value, InternalAccumulator.getParam(name.get)) match {
+ case (v: Int, IntAccumulatorParam) => JInt(v)
+ case (v: Long, LongAccumulatorParam) => JInt(v)
+ case (v: String, StringAccumulatorParam) => JString(v)
+ case (v, UpdatedBlockStatusesAccumulatorParam) =>
+ JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) =>
+ ("Block ID" -> id.toString) ~
+ ("Status" -> blockStatusToJson(status))
+ })
+ case (v, p) =>
+ throw new IllegalArgumentException(s"unexpected combination of accumulator value " +
+ s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'")
+ }
+ } else {
+ // For all external accumulators, just use strings
+ JString(value.toString)
+ }
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -303,9 +336,9 @@ private[spark] object JsonProtocol {
}.getOrElse(JNothing)
val shuffleWriteMetrics: JValue =
taskMetrics.shuffleWriteMetrics.map { wm =>
- ("Shuffle Bytes Written" -> wm.shuffleBytesWritten) ~
- ("Shuffle Write Time" -> wm.shuffleWriteTime) ~
- ("Shuffle Records Written" -> wm.shuffleRecordsWritten)
+ ("Shuffle Bytes Written" -> wm.bytesWritten) ~
+ ("Shuffle Write Time" -> wm.writeTime) ~
+ ("Shuffle Records Written" -> wm.recordsWritten)
}.getOrElse(JNothing)
val inputMetrics: JValue =
taskMetrics.inputMetrics.map { im =>
@@ -324,7 +357,6 @@ private[spark] object JsonProtocol {
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
- ("Host Name" -> taskMetrics.hostname) ~
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
("Result Size" -> taskMetrics.resultSize) ~
@@ -352,12 +384,12 @@ private[spark] object JsonProtocol {
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
- val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
+ val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList)
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
- ("Metrics" -> metrics)
+ ("Accumulator Updates" -> accumUpdates)
case taskCommitDenied: TaskCommitDenied =>
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
@@ -619,14 +651,15 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
val execInfo = (json \ "Executor ID").extract[String]
- val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
+ val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
val taskId = (json \ "Task ID").extract[Long]
val stageId = (json \ "Stage ID").extract[Int]
val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
- val metrics = taskMetricsFromJson(json \ "Task Metrics")
- (taskId, stageId, stageAttemptId, metrics)
+ val updates =
+ (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
+ (taskId, stageId, stageAttemptId, updates)
}
- SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics)
+ SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
}
/** --------------------------------------------------------------------- *
@@ -647,7 +680,7 @@ private[spark] object JsonProtocol {
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
- case Some(values) => values.map(accumulableInfoFromJson(_))
+ case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
@@ -675,7 +708,7 @@ private[spark] object JsonProtocol {
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
- case Some(values) => values.map(accumulableInfoFromJson(_))
+ case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
@@ -690,11 +723,43 @@ private[spark] object JsonProtocol {
def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
val id = (json \ "ID").extract[Long]
- val name = (json \ "Name").extract[String]
- val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
- val value = (json \ "Value").extract[String]
+ val name = (json \ "Name").extractOpt[String]
+ val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
+ val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
- AccumulableInfo(id, name, update, value, internal)
+ val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
+ new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+ }
+
+ /**
+ * Deserialize the value of an accumulator from JSON.
+ *
+ * For accumulators representing internal task metrics, this looks up the relevant
+ * [[AccumulatorParam]] to deserialize the value accordingly. For all other
+ * accumulators, this will simply deserialize the value as a string.
+ *
+ * The behavior here must match that of [[accumValueToJson]]. Exposed for testing.
+ */
+ private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = {
+ import AccumulatorParam._
+ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
+ (value, InternalAccumulator.getParam(name.get)) match {
+ case (JInt(v), IntAccumulatorParam) => v.toInt
+ case (JInt(v), LongAccumulatorParam) => v.toLong
+ case (JString(v), StringAccumulatorParam) => v
+ case (JArray(v), UpdatedBlockStatusesAccumulatorParam) =>
+ v.map { blockJson =>
+ val id = BlockId((blockJson \ "Block ID").extract[String])
+ val status = blockStatusFromJson(blockJson \ "Status")
+ (id, status)
+ }
+ case (v, p) =>
+ throw new IllegalArgumentException(s"unexpected combination of accumulator " +
+ s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'")
+ }
+ } else {
+ value.extract[String]
+ }
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
@@ -702,7 +767,6 @@ private[spark] object JsonProtocol {
return TaskMetrics.empty
}
val metrics = new TaskMetrics
- metrics.setHostname((json \ "Host Name").extract[String])
metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
metrics.setResultSize((json \ "Result Size").extract[Long])
@@ -787,10 +851,12 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
- val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
- map(_.extract[String]).orNull
- val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
- ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
+ val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
+ // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
+ val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
+ .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
+ .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates())
+ ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `taskCommitDenied` =>
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index df9e0502e7..5afd6d6e22 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -682,8 +682,7 @@ private[spark] class ExternalSorter[K, V, C](
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 625fdd57eb..876c3a2283 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -191,8 +191,6 @@ public class UnsafeShuffleWriterSuite {
});
when(taskContext.taskMetrics()).thenReturn(taskMetrics);
- when(taskContext.internalMetricsToAccumulators()).thenReturn(null);
-
when(shuffleDep.serializer()).thenReturn(Option.<Serializer>apply(serializer));
when(shuffleDep.partitioner()).thenReturn(hashPartitioner);
}
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 5b84acf40b..11c97d7d9a 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -17,18 +17,22 @@
package org.apache.spark
+import javax.annotation.concurrent.GuardedBy
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.ref.WeakReference
+import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.scalatest.exceptions.TestFailedException
import org.apache.spark.scheduler._
+import org.apache.spark.serializer.JavaSerializer
class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {
- import InternalAccumulator._
+ import AccumulatorParam._
implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
new AccumulableParam[mutable.Set[A], A] {
@@ -59,7 +63,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
longAcc.value should be (210L + maxInt * 20)
}
- test ("value not assignable from tasks") {
+ test("value not assignable from tasks") {
sc = new SparkContext("local", "test")
val acc : Accumulator[Int] = sc.accumulator(0)
@@ -84,7 +88,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
- test ("value not readable in tasks") {
+ test("value not readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -159,193 +163,157 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
assert(!Accumulators.originals.get(accId).isDefined)
}
- test("internal accumulators in TaskContext") {
+ test("get accum") {
sc = new SparkContext("local", "test")
- val accums = InternalAccumulator.create(sc)
- val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
- val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
- val collectedInternalAccums = taskContext.collectInternalAccumulators()
- val collectedAccums = taskContext.collectAccumulators()
- assert(internalMetricsToAccums.size > 0)
- assert(internalMetricsToAccums.values.forall(_.isInternal))
- assert(internalMetricsToAccums.contains(TEST_ACCUMULATOR))
- val testAccum = internalMetricsToAccums(TEST_ACCUMULATOR)
- assert(collectedInternalAccums.size === internalMetricsToAccums.size)
- assert(collectedInternalAccums.size === collectedAccums.size)
- assert(collectedInternalAccums.contains(testAccum.id))
- assert(collectedAccums.contains(testAccum.id))
- }
+ // Don't register with SparkContext for cleanup
+ var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val accId = acc.id
+ val ref = WeakReference(acc)
+ assert(ref.get.isDefined)
+ Accumulators.register(ref.get.get)
- test("internal accumulators in a stage") {
- val listener = new SaveInfoListener
- val numPartitions = 10
- sc = new SparkContext("local", "test")
- sc.addSparkListener(listener)
- // Have each task add 1 to the internal accumulator
- val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions)
- // The accumulator values should be merged in the stage
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- assert(stageAccum.value.toLong === numPartitions)
- // The accumulator should be updated locally on each task
- val taskAccumValues = taskInfos.map { taskInfo =>
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- taskAccum.value.toLong
- }
- // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ // Remove the explicit reference to it and allow weak reference to get garbage collected
+ acc = null
+ System.gc()
+ assert(ref.get.isEmpty)
+
+ // Getting a garbage collected accum should throw error
+ intercept[IllegalAccessError] {
+ Accumulators.get(accId)
}
- rdd.count()
+
+ // Getting a normal accumulator. Note: this has to be separate because referencing an
+ // accumulator above in an `assert` would keep it from being garbage collected.
+ val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true)
+ Accumulators.register(acc2)
+ assert(Accumulators.get(acc2.id) === Some(acc2))
+
+ // Getting an accumulator that does not exist should return None
+ assert(Accumulators.get(100000).isEmpty)
}
- test("internal accumulators in multiple stages") {
- val listener = new SaveInfoListener
- val numPartitions = 10
- sc = new SparkContext("local", "test")
- sc.addSparkListener(listener)
- // Each stage creates its own set of internal accumulators so the
- // values for the same metric should not be mixed up across stages
- val rdd = sc.parallelize(1 to 100, numPartitions)
- .map { i => (i, i) }
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- iter
- }
- .reduceByKey { case (x, y) => x + y }
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 10
- iter
- }
- .repartition(numPartitions * 2)
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 100
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- // We ran 3 stages, and the accumulator values should be distinct
- val stageInfos = listener.getCompletedStageInfos
- assert(stageInfos.size === 3)
- val (firstStageAccum, secondStageAccum, thirdStageAccum) =
- (findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR),
- findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR),
- findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR))
- assert(firstStageAccum.value.toLong === numPartitions)
- assert(secondStageAccum.value.toLong === numPartitions * 10)
- assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
- }
- rdd.count()
+ test("only external accums are automatically registered") {
+ val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false)
+ val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true)
+ assert(!accEx.isInternal)
+ assert(accIn.isInternal)
+ assert(Accumulators.get(accEx.id).isDefined)
+ assert(Accumulators.get(accIn.id).isEmpty)
}
- test("internal accumulators in fully resubmitted stages") {
- testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks
+ test("copy") {
+ val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false)
+ val acc2 = acc1.copy()
+ assert(acc1.id === acc2.id)
+ assert(acc1.value === acc2.value)
+ assert(acc1.name === acc2.name)
+ assert(acc1.isInternal === acc2.isInternal)
+ assert(acc1.countFailedValues === acc2.countFailedValues)
+ assert(acc1 !== acc2)
+ // Modifying one does not affect the other
+ acc1.add(44L)
+ assert(acc1.value === 500L)
+ assert(acc2.value === 456L)
+ acc2.add(144L)
+ assert(acc1.value === 500L)
+ assert(acc2.value === 600L)
}
- test("internal accumulators in partially resubmitted stages") {
- testInternalAccumulatorsWithFailedTasks((i: Int) => i % 2 == 0) // fail a subset
+ test("register multiple accums with same ID") {
+ // Make sure these are internal accums so we don't automatically register them already
+ val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val acc2 = acc1.copy()
+ assert(acc1 !== acc2)
+ assert(acc1.id === acc2.id)
+ assert(Accumulators.originals.isEmpty)
+ assert(Accumulators.get(acc1.id).isEmpty)
+ Accumulators.register(acc1)
+ Accumulators.register(acc2)
+ // The second one does not override the first one
+ assert(Accumulators.originals.size === 1)
+ assert(Accumulators.get(acc1.id) === Some(acc1))
}
- /**
- * Return the accumulable info that matches the specified name.
- */
- private def findAccumulableInfo(
- accums: Iterable[AccumulableInfo],
- name: String): AccumulableInfo = {
- accums.find { a => a.name == name }.getOrElse {
- throw new TestFailedException(s"internal accumulator '$name' not found", 0)
- }
+ test("string accumulator param") {
+ val acc = new Accumulator("", StringAccumulatorParam, Some("darkness"))
+ assert(acc.value === "")
+ acc.setValue("feeds")
+ assert(acc.value === "feeds")
+ acc.add("your")
+ assert(acc.value === "your") // value is overwritten, not concatenated
+ acc += "soul"
+ assert(acc.value === "soul")
+ acc ++= "with"
+ assert(acc.value === "with")
+ acc.merge("kindness")
+ assert(acc.value === "kindness")
}
- /**
- * Test whether internal accumulators are merged properly if some tasks fail.
- */
- private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = {
- val listener = new SaveInfoListener
- val numPartitions = 10
- val numFailedPartitions = (0 until numPartitions).count(failCondition)
- // This says use 1 core and retry tasks up to 2 times
- sc = new SparkContext("local[1, 2]", "test")
- sc.addSparkListener(listener)
- val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
- val taskContext = TaskContext.get()
- taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- // Fail the first attempts of a subset of the tasks
- if (failCondition(i) && taskContext.attemptNumber() == 0) {
- throw new Exception("Failing a task intentionally.")
- }
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions + numFailedPartitions)
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- // We should not double count values in the merged accumulator
- assert(stageAccum.value.toLong === numPartitions)
- val taskAccumValues = taskInfos.flatMap { taskInfo =>
- if (!taskInfo.failed) {
- // If a task succeeded, its update value should always be 1
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- Some(taskAccum.value.toLong)
- } else {
- // If a task failed, we should not get its accumulator values
- assert(taskInfo.accumulables.isEmpty)
- None
- }
- }
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
- }
- rdd.count()
+ test("list accumulator param") {
+ val acc = new Accumulator(Seq.empty[Int], new ListAccumulatorParam[Int], Some("numbers"))
+ assert(acc.value === Seq.empty[Int])
+ acc.add(Seq(1, 2))
+ assert(acc.value === Seq(1, 2))
+ acc += Seq(3, 4)
+ assert(acc.value === Seq(1, 2, 3, 4))
+ acc ++= Seq(5, 6)
+ assert(acc.value === Seq(1, 2, 3, 4, 5, 6))
+ acc.merge(Seq(7, 8))
+ assert(acc.value === Seq(1, 2, 3, 4, 5, 6, 7, 8))
+ acc.setValue(Seq(9, 10))
+ assert(acc.value === Seq(9, 10))
+ }
+
+ test("value is reset on the executors") {
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
+ val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
+ val externalAccums = Seq(acc1, acc2)
+ val internalAccums = InternalAccumulator.create()
+ // Set some values; these should not be observed later on the "executors"
+ acc1.setValue(10)
+ acc2.setValue(20L)
+ internalAccums
+ .find(_.name == Some(InternalAccumulator.TEST_ACCUM))
+ .get.asInstanceOf[Accumulator[Long]]
+ .setValue(30L)
+ // Simulate the task being serialized and sent to the executors.
+ val dummyTask = new DummyTask(internalAccums, externalAccums)
+ val serInstance = new JavaSerializer(new SparkConf).newInstance()
+ val taskSer = Task.serializeWithDependencies(
+ dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
+ // Now we're on the executors.
+ // Deserialize the task and assert that its accumulators are zero'ed out.
+ val (_, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
+ val taskDeser = serInstance.deserialize[DummyTask](
+ taskBytes, Thread.currentThread.getContextClassLoader)
+ // Assert that executors see only zeros
+ taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
+ taskDeser.internalAccums.foreach { a => assert(a.localValue == a.zero) }
}
}
private[spark] object AccumulatorSuite {
+ import InternalAccumulator._
+
/**
- * Run one or more Spark jobs and verify that the peak execution memory accumulator
- * is updated afterwards.
+ * Run one or more Spark jobs and verify that in at least one job the peak execution memory
+ * accumulator is updated afterwards.
*/
def verifyPeakExecutionMemorySet(
sc: SparkContext,
testName: String)(testBody: => Unit): Unit = {
val listener = new SaveInfoListener
sc.addSparkListener(listener)
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { jobId =>
- if (jobId == 0) {
- // The first job is a dummy one to verify that the accumulator does not already exist
- val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
- assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
- } else {
- // In the subsequent jobs, verify that peak execution memory is updated
- val accum = listener.getCompletedStageInfos
- .flatMap(_.accumulables.values)
- .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
- .getOrElse {
- throw new TestFailedException(
- s"peak execution memory accumulator not set in '$testName'", 0)
- }
- assert(accum.value.toLong > 0)
- }
- }
- // Run the jobs
- sc.parallelize(1 to 10).count()
testBody
+ val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
+ val isSet = accums.exists { a =>
+ a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)
+ }
+ if (!isSet) {
+ throw new TestFailedException(s"peak execution memory accumulator not set in '$testName'", 0)
+ }
}
}
@@ -357,6 +325,10 @@ private class SaveInfoListener extends SparkListener {
private val completedTaskInfos: ArrayBuffer[TaskInfo] = new ArrayBuffer[TaskInfo]
private var jobCompletionCallback: (Int => Unit) = null // parameter is job ID
+ // Accesses must be synchronized to ensure failures in `jobCompletionCallback` are propagated
+ @GuardedBy("this")
+ private var exception: Throwable = null
+
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq
@@ -365,9 +337,20 @@ private class SaveInfoListener extends SparkListener {
jobCompletionCallback = callback
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ /** Throw a stored exception, if any. */
+ def maybeThrowException(): Unit = synchronized {
+ if (exception != null) { throw exception }
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
if (jobCompletionCallback != null) {
- jobCompletionCallback(jobEnd.jobId)
+ try {
+ jobCompletionCallback(jobEnd.jobId)
+ } catch {
+ // Store any exception thrown here so we can throw them later in the main thread.
+ // Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test.
+ case NonFatal(e) => exception = e
+ }
}
}
@@ -379,3 +362,14 @@ private class SaveInfoListener extends SparkListener {
completedTaskInfos += taskEnd.taskInfo
}
}
+
+
+/**
+ * A dummy [[Task]] that contains internal and external [[Accumulator]]s.
+ */
+private[spark] class DummyTask(
+ val internalAccums: Seq[Accumulator[_]],
+ val externalAccums: Seq[Accumulator[_]])
+ extends Task[Int](0, 0, 0, internalAccums) {
+ override def runTask(c: TaskContext): Int = 1
+}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 4e678fbac6..80a1de6065 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -801,7 +801,7 @@ class ExecutorAllocationManagerSuite
assert(maxNumExecutorsNeeded(manager) === 1)
// If the task is failed, we expect it to be resubmitted later.
- val taskEndReason = ExceptionFailure(null, null, null, null, null, None)
+ val taskEndReason = ExceptionFailure(null, null, null, null, None)
sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
assert(maxNumExecutorsNeeded(manager) === 1)
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index c7f629a14b..3777d77f8f 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -215,14 +215,16 @@ class HeartbeatReceiverSuite
val metrics = new TaskMetrics
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
- Heartbeat(executorId, Array(1L -> metrics), blockManagerId))
+ Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
} else {
assert(!response.reregisterBlockManager)
// Additionally verify that the scheduler callback is called with the correct parameters
verify(scheduler).executorHeartbeatReceived(
- Matchers.eq(executorId), Matchers.eq(Array(1L -> metrics)), Matchers.eq(blockManagerId))
+ Matchers.eq(executorId),
+ Matchers.eq(Array(1L -> metrics.accumulatorUpdates())),
+ Matchers.eq(blockManagerId))
}
}
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
new file mode 100644
index 0000000000..630b46f828
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.storage.{BlockId, BlockStatus}
+
+
+class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
+ import InternalAccumulator._
+ import AccumulatorParam._
+
+ test("get param") {
+ assert(getParam(EXECUTOR_DESERIALIZE_TIME) === LongAccumulatorParam)
+ assert(getParam(EXECUTOR_RUN_TIME) === LongAccumulatorParam)
+ assert(getParam(RESULT_SIZE) === LongAccumulatorParam)
+ assert(getParam(JVM_GC_TIME) === LongAccumulatorParam)
+ assert(getParam(RESULT_SERIALIZATION_TIME) === LongAccumulatorParam)
+ assert(getParam(MEMORY_BYTES_SPILLED) === LongAccumulatorParam)
+ assert(getParam(DISK_BYTES_SPILLED) === LongAccumulatorParam)
+ assert(getParam(PEAK_EXECUTION_MEMORY) === LongAccumulatorParam)
+ assert(getParam(UPDATED_BLOCK_STATUSES) === UpdatedBlockStatusesAccumulatorParam)
+ assert(getParam(TEST_ACCUM) === LongAccumulatorParam)
+ // shuffle read
+ assert(getParam(shuffleRead.REMOTE_BLOCKS_FETCHED) === IntAccumulatorParam)
+ assert(getParam(shuffleRead.LOCAL_BLOCKS_FETCHED) === IntAccumulatorParam)
+ assert(getParam(shuffleRead.REMOTE_BYTES_READ) === LongAccumulatorParam)
+ assert(getParam(shuffleRead.LOCAL_BYTES_READ) === LongAccumulatorParam)
+ assert(getParam(shuffleRead.FETCH_WAIT_TIME) === LongAccumulatorParam)
+ assert(getParam(shuffleRead.RECORDS_READ) === LongAccumulatorParam)
+ // shuffle write
+ assert(getParam(shuffleWrite.BYTES_WRITTEN) === LongAccumulatorParam)
+ assert(getParam(shuffleWrite.RECORDS_WRITTEN) === LongAccumulatorParam)
+ assert(getParam(shuffleWrite.WRITE_TIME) === LongAccumulatorParam)
+ // input
+ assert(getParam(input.READ_METHOD) === StringAccumulatorParam)
+ assert(getParam(input.RECORDS_READ) === LongAccumulatorParam)
+ assert(getParam(input.BYTES_READ) === LongAccumulatorParam)
+ // output
+ assert(getParam(output.WRITE_METHOD) === StringAccumulatorParam)
+ assert(getParam(output.RECORDS_WRITTEN) === LongAccumulatorParam)
+ assert(getParam(output.BYTES_WRITTEN) === LongAccumulatorParam)
+ // default to Long
+ assert(getParam(METRICS_PREFIX + "anything") === LongAccumulatorParam)
+ intercept[IllegalArgumentException] {
+ getParam("something that does not start with the right prefix")
+ }
+ }
+
+ test("create by name") {
+ val executorRunTime = create(EXECUTOR_RUN_TIME)
+ val updatedBlockStatuses = create(UPDATED_BLOCK_STATUSES)
+ val shuffleRemoteBlocksRead = create(shuffleRead.REMOTE_BLOCKS_FETCHED)
+ val inputReadMethod = create(input.READ_METHOD)
+ assert(executorRunTime.name === Some(EXECUTOR_RUN_TIME))
+ assert(updatedBlockStatuses.name === Some(UPDATED_BLOCK_STATUSES))
+ assert(shuffleRemoteBlocksRead.name === Some(shuffleRead.REMOTE_BLOCKS_FETCHED))
+ assert(inputReadMethod.name === Some(input.READ_METHOD))
+ assert(executorRunTime.value.isInstanceOf[Long])
+ assert(updatedBlockStatuses.value.isInstanceOf[Seq[_]])
+ // We cannot assert the type of the value directly since the type parameter is erased.
+ // Instead, try casting a `Seq` of expected type and see if it fails in run time.
+ updatedBlockStatuses.setValueAny(Seq.empty[(BlockId, BlockStatus)])
+ assert(shuffleRemoteBlocksRead.value.isInstanceOf[Int])
+ assert(inputReadMethod.value.isInstanceOf[String])
+ // default to Long
+ val anything = create(METRICS_PREFIX + "anything")
+ assert(anything.value.isInstanceOf[Long])
+ }
+
+ test("create") {
+ val accums = create()
+ val shuffleReadAccums = createShuffleReadAccums()
+ val shuffleWriteAccums = createShuffleWriteAccums()
+ val inputAccums = createInputAccums()
+ val outputAccums = createOutputAccums()
+ // assert they're all internal
+ assert(accums.forall(_.isInternal))
+ assert(shuffleReadAccums.forall(_.isInternal))
+ assert(shuffleWriteAccums.forall(_.isInternal))
+ assert(inputAccums.forall(_.isInternal))
+ assert(outputAccums.forall(_.isInternal))
+ // assert they all count on failures
+ assert(accums.forall(_.countFailedValues))
+ assert(shuffleReadAccums.forall(_.countFailedValues))
+ assert(shuffleWriteAccums.forall(_.countFailedValues))
+ assert(inputAccums.forall(_.countFailedValues))
+ assert(outputAccums.forall(_.countFailedValues))
+ // assert they all have names
+ assert(accums.forall(_.name.isDefined))
+ assert(shuffleReadAccums.forall(_.name.isDefined))
+ assert(shuffleWriteAccums.forall(_.name.isDefined))
+ assert(inputAccums.forall(_.name.isDefined))
+ assert(outputAccums.forall(_.name.isDefined))
+ // assert `accums` is a strict superset of the others
+ val accumNames = accums.map(_.name.get).toSet
+ val shuffleReadAccumNames = shuffleReadAccums.map(_.name.get).toSet
+ val shuffleWriteAccumNames = shuffleWriteAccums.map(_.name.get).toSet
+ val inputAccumNames = inputAccums.map(_.name.get).toSet
+ val outputAccumNames = outputAccums.map(_.name.get).toSet
+ assert(shuffleReadAccumNames.subsetOf(accumNames))
+ assert(shuffleWriteAccumNames.subsetOf(accumNames))
+ assert(inputAccumNames.subsetOf(accumNames))
+ assert(outputAccumNames.subsetOf(accumNames))
+ }
+
+ test("naming") {
+ val accums = create()
+ val shuffleReadAccums = createShuffleReadAccums()
+ val shuffleWriteAccums = createShuffleWriteAccums()
+ val inputAccums = createInputAccums()
+ val outputAccums = createOutputAccums()
+ // assert that prefixes are properly namespaced
+ assert(SHUFFLE_READ_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(SHUFFLE_WRITE_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(INPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(OUTPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(accums.forall(_.name.get.startsWith(METRICS_PREFIX)))
+ // assert they all start with the expected prefixes
+ assert(shuffleReadAccums.forall(_.name.get.startsWith(SHUFFLE_READ_METRICS_PREFIX)))
+ assert(shuffleWriteAccums.forall(_.name.get.startsWith(SHUFFLE_WRITE_METRICS_PREFIX)))
+ assert(inputAccums.forall(_.name.get.startsWith(INPUT_METRICS_PREFIX)))
+ assert(outputAccums.forall(_.name.get.startsWith(OUTPUT_METRICS_PREFIX)))
+ }
+
+ test("internal accumulators in TaskContext") {
+ val taskContext = TaskContext.empty()
+ val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
+ assert(accumUpdates.size > 0)
+ assert(accumUpdates.forall(_.internal))
+ val testAccum = taskContext.taskMetrics.getAccum(TEST_ACCUM)
+ assert(accumUpdates.exists(_.id == testAccum.id))
+ }
+
+ test("internal accumulators in a stage") {
+ val listener = new SaveInfoListener
+ val numPartitions = 10
+ sc = new SparkContext("local", "test")
+ sc.addSparkListener(listener)
+ // Have each task add 1 to the internal accumulator
+ val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1
+ iter
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ val stageInfos = listener.getCompletedStageInfos
+ val taskInfos = listener.getCompletedTaskInfos
+ assert(stageInfos.size === 1)
+ assert(taskInfos.size === numPartitions)
+ // The accumulator values should be merged in the stage
+ val stageAccum = findTestAccum(stageInfos.head.accumulables.values)
+ assert(stageAccum.value.get.asInstanceOf[Long] === numPartitions)
+ // The accumulator should be updated locally on each task
+ val taskAccumValues = taskInfos.map { taskInfo =>
+ val taskAccum = findTestAccum(taskInfo.accumulables)
+ assert(taskAccum.update.isDefined)
+ assert(taskAccum.update.get.asInstanceOf[Long] === 1L)
+ taskAccum.value.get.asInstanceOf[Long]
+ }
+ // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
+ assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ }
+ rdd.count()
+ }
+
+ test("internal accumulators in multiple stages") {
+ val listener = new SaveInfoListener
+ val numPartitions = 10
+ sc = new SparkContext("local", "test")
+ sc.addSparkListener(listener)
+ // Each stage creates its own set of internal accumulators so the
+ // values for the same metric should not be mixed up across stages
+ val rdd = sc.parallelize(1 to 100, numPartitions)
+ .map { i => (i, i) }
+ .mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1
+ iter
+ }
+ .reduceByKey { case (x, y) => x + y }
+ .mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 10
+ iter
+ }
+ .repartition(numPartitions * 2)
+ .mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 100
+ iter
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ // We ran 3 stages, and the accumulator values should be distinct
+ val stageInfos = listener.getCompletedStageInfos
+ assert(stageInfos.size === 3)
+ val (firstStageAccum, secondStageAccum, thirdStageAccum) =
+ (findTestAccum(stageInfos(0).accumulables.values),
+ findTestAccum(stageInfos(1).accumulables.values),
+ findTestAccum(stageInfos(2).accumulables.values))
+ assert(firstStageAccum.value.get.asInstanceOf[Long] === numPartitions)
+ assert(secondStageAccum.value.get.asInstanceOf[Long] === numPartitions * 10)
+ assert(thirdStageAccum.value.get.asInstanceOf[Long] === numPartitions * 2 * 100)
+ }
+ rdd.count()
+ }
+
+ // TODO: these two tests are incorrect; they don't actually trigger stage retries.
+ ignore("internal accumulators in fully resubmitted stages") {
+ testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks
+ }
+
+ ignore("internal accumulators in partially resubmitted stages") {
+ testInternalAccumulatorsWithFailedTasks((i: Int) => i % 2 == 0) // fail a subset
+ }
+
+ test("internal accumulators are registered for cleanups") {
+ sc = new SparkContext("local", "test") {
+ private val myCleaner = new SaveAccumContextCleaner(this)
+ override def cleaner: Option[ContextCleaner] = Some(myCleaner)
+ }
+ assert(Accumulators.originals.isEmpty)
+ sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
+ val internalAccums = InternalAccumulator.create()
+ // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
+ assert(Accumulators.originals.size === internalAccums.size * 2)
+ val accumsRegistered = sc.cleaner match {
+ case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
+ case _ => Seq.empty[Long]
+ }
+ // Make sure the same set of accumulators is registered for cleanup
+ assert(accumsRegistered.size === internalAccums.size * 2)
+ assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
+ }
+
+ /**
+ * Return the accumulable info that matches the specified name.
+ */
+ private def findTestAccum(accums: Iterable[AccumulableInfo]): AccumulableInfo = {
+ accums.find { a => a.name == Some(TEST_ACCUM) }.getOrElse {
+ fail(s"unable to find internal accumulator called $TEST_ACCUM")
+ }
+ }
+
+ /**
+ * Test whether internal accumulators are merged properly if some tasks fail.
+ * TODO: make this actually retry the stage.
+ */
+ private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = {
+ val listener = new SaveInfoListener
+ val numPartitions = 10
+ val numFailedPartitions = (0 until numPartitions).count(failCondition)
+ // This says use 1 core and retry tasks up to 2 times
+ sc = new SparkContext("local[1, 2]", "test")
+ sc.addSparkListener(listener)
+ val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
+ val taskContext = TaskContext.get()
+ taskContext.taskMetrics().getAccum(TEST_ACCUM) += 1
+ // Fail the first attempts of a subset of the tasks
+ if (failCondition(i) && taskContext.attemptNumber() == 0) {
+ throw new Exception("Failing a task intentionally.")
+ }
+ iter
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ val stageInfos = listener.getCompletedStageInfos
+ val taskInfos = listener.getCompletedTaskInfos
+ assert(stageInfos.size === 1)
+ assert(taskInfos.size === numPartitions + numFailedPartitions)
+ val stageAccum = findTestAccum(stageInfos.head.accumulables.values)
+ // If all partitions failed, then we would resubmit the whole stage again and create a
+ // fresh set of internal accumulators. Otherwise, these internal accumulators do count
+ // failed values, so we must include the failed values.
+ val expectedAccumValue =
+ if (numPartitions == numFailedPartitions) {
+ numPartitions
+ } else {
+ numPartitions + numFailedPartitions
+ }
+ assert(stageAccum.value.get.asInstanceOf[Long] === expectedAccumValue)
+ val taskAccumValues = taskInfos.flatMap { taskInfo =>
+ if (!taskInfo.failed) {
+ // If a task succeeded, its update value should always be 1
+ val taskAccum = findTestAccum(taskInfo.accumulables)
+ assert(taskAccum.update.isDefined)
+ assert(taskAccum.update.get.asInstanceOf[Long] === 1L)
+ assert(taskAccum.value.isDefined)
+ Some(taskAccum.value.get.asInstanceOf[Long])
+ } else {
+ // If a task failed, we should not get its accumulator values
+ assert(taskInfo.accumulables.isEmpty)
+ None
+ }
+ }
+ assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ }
+ rdd.count()
+ listener.maybeThrowException()
+ }
+
+ /**
+ * A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup.
+ */
+ private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
+ private val accumsRegistered = new ArrayBuffer[Long]
+
+ override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+ accumsRegistered += a.id
+ super.registerAccumulatorForCleanup(a)
+ }
+
+ def accumsRegisteredForCleanup: Seq[Long] = accumsRegistered.toArray
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 9be9db01c7..d3359c7406 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -42,6 +42,8 @@ private[spark] abstract class SparkFunSuite extends FunSuite with Logging {
test()
} finally {
logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
+ // Avoid leaking map entries in tests that use accumulators without SparkContext
+ Accumulators.clear()
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index e5ec2aa1be..15be0b194e 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -17,12 +17,542 @@
package org.apache.spark.executor
-import org.apache.spark.SparkFunSuite
+import org.scalatest.Assertions
+
+import org.apache.spark._
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId}
+
class TaskMetricsSuite extends SparkFunSuite {
- test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") {
- val taskMetrics = new TaskMetrics()
- taskMetrics.mergeShuffleReadMetrics()
- assert(taskMetrics.shuffleReadMetrics.isEmpty)
+ import AccumulatorParam._
+ import InternalAccumulator._
+ import StorageLevel._
+ import TaskMetricsSuite._
+
+ test("create") {
+ val internalAccums = InternalAccumulator.create()
+ val tm1 = new TaskMetrics
+ val tm2 = new TaskMetrics(internalAccums)
+ assert(tm1.accumulatorUpdates().size === internalAccums.size)
+ assert(tm1.shuffleReadMetrics.isEmpty)
+ assert(tm1.shuffleWriteMetrics.isEmpty)
+ assert(tm1.inputMetrics.isEmpty)
+ assert(tm1.outputMetrics.isEmpty)
+ assert(tm2.accumulatorUpdates().size === internalAccums.size)
+ assert(tm2.shuffleReadMetrics.isEmpty)
+ assert(tm2.shuffleWriteMetrics.isEmpty)
+ assert(tm2.inputMetrics.isEmpty)
+ assert(tm2.outputMetrics.isEmpty)
+ // TaskMetrics constructor expects minimal set of initial accumulators
+ intercept[IllegalArgumentException] { new TaskMetrics(Seq.empty[Accumulator[_]]) }
+ }
+
+ test("create with unnamed accum") {
+ intercept[IllegalArgumentException] {
+ new TaskMetrics(
+ InternalAccumulator.create() ++ Seq(
+ new Accumulator(0, IntAccumulatorParam, None, internal = true)))
+ }
+ }
+
+ test("create with duplicate name accum") {
+ intercept[IllegalArgumentException] {
+ new TaskMetrics(
+ InternalAccumulator.create() ++ Seq(
+ new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
+ }
+ }
+
+ test("create with external accum") {
+ intercept[IllegalArgumentException] {
+ new TaskMetrics(
+ InternalAccumulator.create() ++ Seq(
+ new Accumulator(0, IntAccumulatorParam, Some("x"))))
+ }
+ }
+
+ test("create shuffle read metrics") {
+ import shuffleRead._
+ val accums = InternalAccumulator.createShuffleReadAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(REMOTE_BLOCKS_FETCHED).setValueAny(1)
+ accums(LOCAL_BLOCKS_FETCHED).setValueAny(2)
+ accums(REMOTE_BYTES_READ).setValueAny(3L)
+ accums(LOCAL_BYTES_READ).setValueAny(4L)
+ accums(FETCH_WAIT_TIME).setValueAny(5L)
+ accums(RECORDS_READ).setValueAny(6L)
+ val sr = new ShuffleReadMetrics(accums)
+ assert(sr.remoteBlocksFetched === 1)
+ assert(sr.localBlocksFetched === 2)
+ assert(sr.remoteBytesRead === 3L)
+ assert(sr.localBytesRead === 4L)
+ assert(sr.fetchWaitTime === 5L)
+ assert(sr.recordsRead === 6L)
+ }
+
+ test("create shuffle write metrics") {
+ import shuffleWrite._
+ val accums = InternalAccumulator.createShuffleWriteAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(BYTES_WRITTEN).setValueAny(1L)
+ accums(RECORDS_WRITTEN).setValueAny(2L)
+ accums(WRITE_TIME).setValueAny(3L)
+ val sw = new ShuffleWriteMetrics(accums)
+ assert(sw.bytesWritten === 1L)
+ assert(sw.recordsWritten === 2L)
+ assert(sw.writeTime === 3L)
+ }
+
+ test("create input metrics") {
+ import input._
+ val accums = InternalAccumulator.createInputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(BYTES_READ).setValueAny(1L)
+ accums(RECORDS_READ).setValueAny(2L)
+ accums(READ_METHOD).setValueAny(DataReadMethod.Hadoop.toString)
+ val im = new InputMetrics(accums)
+ assert(im.bytesRead === 1L)
+ assert(im.recordsRead === 2L)
+ assert(im.readMethod === DataReadMethod.Hadoop)
+ }
+
+ test("create output metrics") {
+ import output._
+ val accums = InternalAccumulator.createOutputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(BYTES_WRITTEN).setValueAny(1L)
+ accums(RECORDS_WRITTEN).setValueAny(2L)
+ accums(WRITE_METHOD).setValueAny(DataWriteMethod.Hadoop.toString)
+ val om = new OutputMetrics(accums)
+ assert(om.bytesWritten === 1L)
+ assert(om.recordsWritten === 2L)
+ assert(om.writeMethod === DataWriteMethod.Hadoop)
+ }
+
+ test("mutating values") {
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ // initial values
+ assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
+ assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 0L)
+ assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 0L)
+ assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 0L)
+ assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 0L)
+ assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 0L)
+ assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 0L)
+ assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 0L)
+ assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES,
+ Seq.empty[(BlockId, BlockStatus)])
+ // set or increment values
+ tm.setExecutorDeserializeTime(100L)
+ tm.setExecutorDeserializeTime(1L) // overwrite
+ tm.setExecutorRunTime(200L)
+ tm.setExecutorRunTime(2L)
+ tm.setResultSize(300L)
+ tm.setResultSize(3L)
+ tm.setJvmGCTime(400L)
+ tm.setJvmGCTime(4L)
+ tm.setResultSerializationTime(500L)
+ tm.setResultSerializationTime(5L)
+ tm.incMemoryBytesSpilled(600L)
+ tm.incMemoryBytesSpilled(6L) // add
+ tm.incDiskBytesSpilled(700L)
+ tm.incDiskBytesSpilled(7L)
+ tm.incPeakExecutionMemory(800L)
+ tm.incPeakExecutionMemory(8L)
+ val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L))
+ val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L))
+ tm.incUpdatedBlockStatuses(Seq(block1))
+ tm.incUpdatedBlockStatuses(Seq(block2))
+ // assert new values exist
+ assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 1L)
+ assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 2L)
+ assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 3L)
+ assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 4L)
+ assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 5L)
+ assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 606L)
+ assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 707L)
+ assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 808L)
+ assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES,
+ Seq(block1, block2))
+ }
+
+ test("mutating shuffle read metrics values") {
+ import shuffleRead._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
+ }
+ // create shuffle read metrics
+ assert(tm.shuffleReadMetrics.isEmpty)
+ tm.registerTempShuffleReadMetrics()
+ tm.mergeShuffleReadMetrics()
+ assert(tm.shuffleReadMetrics.isDefined)
+ val sr = tm.shuffleReadMetrics.get
+ // initial values
+ assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
+ assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
+ assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L)
+ assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L)
+ assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 0L)
+ // set and increment values
+ sr.setRemoteBlocksFetched(100)
+ sr.setRemoteBlocksFetched(10)
+ sr.incRemoteBlocksFetched(1) // 10 + 1
+ sr.incRemoteBlocksFetched(1) // 10 + 1 + 1
+ sr.setLocalBlocksFetched(200)
+ sr.setLocalBlocksFetched(20)
+ sr.incLocalBlocksFetched(2)
+ sr.incLocalBlocksFetched(2)
+ sr.setRemoteBytesRead(300L)
+ sr.setRemoteBytesRead(30L)
+ sr.incRemoteBytesRead(3L)
+ sr.incRemoteBytesRead(3L)
+ sr.setLocalBytesRead(400L)
+ sr.setLocalBytesRead(40L)
+ sr.incLocalBytesRead(4L)
+ sr.incLocalBytesRead(4L)
+ sr.setFetchWaitTime(500L)
+ sr.setFetchWaitTime(50L)
+ sr.incFetchWaitTime(5L)
+ sr.incFetchWaitTime(5L)
+ sr.setRecordsRead(600L)
+ sr.setRecordsRead(60L)
+ sr.incRecordsRead(6L)
+ sr.incRecordsRead(6L)
+ // assert new values exist
+ assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12)
+ assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24)
+ assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L)
+ assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L)
+ assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 72L)
+ }
+
+ test("mutating shuffle write metrics values") {
+ import shuffleWrite._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
+ }
+ // create shuffle write metrics
+ assert(tm.shuffleWriteMetrics.isEmpty)
+ tm.registerShuffleWriteMetrics()
+ assert(tm.shuffleWriteMetrics.isDefined)
+ val sw = tm.shuffleWriteMetrics.get
+ // initial values
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
+ assertValEquals(_.writeTime, WRITE_TIME, 0L)
+ // increment and decrement values
+ sw.incBytesWritten(100L)
+ sw.incBytesWritten(10L) // 100 + 10
+ sw.decBytesWritten(1L) // 100 + 10 - 1
+ sw.decBytesWritten(1L) // 100 + 10 - 1 - 1
+ sw.incRecordsWritten(200L)
+ sw.incRecordsWritten(20L)
+ sw.decRecordsWritten(2L)
+ sw.decRecordsWritten(2L)
+ sw.incWriteTime(300L)
+ sw.incWriteTime(30L)
+ // assert new values exist
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 108L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 216L)
+ assertValEquals(_.writeTime, WRITE_TIME, 330L)
+ }
+
+ test("mutating input metrics values") {
+ import input._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
+ (x: Any, y: Any) => assert(x.toString === y.toString))
+ }
+ // create input metrics
+ assert(tm.inputMetrics.isEmpty)
+ tm.registerInputMetrics(DataReadMethod.Memory)
+ assert(tm.inputMetrics.isDefined)
+ val in = tm.inputMetrics.get
+ // initial values
+ assertValEquals(_.bytesRead, BYTES_READ, 0L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 0L)
+ assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory)
+ // set and increment values
+ in.setBytesRead(1L)
+ in.setBytesRead(2L)
+ in.incRecordsRead(1L)
+ in.incRecordsRead(2L)
+ in.setReadMethod(DataReadMethod.Disk)
+ // assert new values exist
+ assertValEquals(_.bytesRead, BYTES_READ, 2L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 3L)
+ assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk)
+ }
+
+ test("mutating output metrics values") {
+ import output._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
+ (x: Any, y: Any) => assert(x.toString === y.toString))
+ }
+ // create input metrics
+ assert(tm.outputMetrics.isEmpty)
+ tm.registerOutputMetrics(DataWriteMethod.Hadoop)
+ assert(tm.outputMetrics.isDefined)
+ val out = tm.outputMetrics.get
+ // initial values
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
+ assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
+ // set values
+ out.setBytesWritten(1L)
+ out.setBytesWritten(2L)
+ out.setRecordsWritten(3L)
+ out.setRecordsWritten(4L)
+ out.setWriteMethod(DataWriteMethod.Hadoop)
+ // assert new values exist
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
+ // Note: this doesn't actually test anything, but there's only one DataWriteMethod
+ // so we can't set it to anything else
+ assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
+ }
+
+ test("merging multiple shuffle read metrics") {
+ val tm = new TaskMetrics
+ assert(tm.shuffleReadMetrics.isEmpty)
+ val sr1 = tm.registerTempShuffleReadMetrics()
+ val sr2 = tm.registerTempShuffleReadMetrics()
+ val sr3 = tm.registerTempShuffleReadMetrics()
+ assert(tm.shuffleReadMetrics.isEmpty)
+ sr1.setRecordsRead(10L)
+ sr2.setRecordsRead(10L)
+ sr1.setFetchWaitTime(1L)
+ sr2.setFetchWaitTime(2L)
+ sr3.setFetchWaitTime(3L)
+ tm.mergeShuffleReadMetrics()
+ assert(tm.shuffleReadMetrics.isDefined)
+ val sr = tm.shuffleReadMetrics.get
+ assert(sr.remoteBlocksFetched === 0L)
+ assert(sr.recordsRead === 20L)
+ assert(sr.fetchWaitTime === 6L)
+
+ // SPARK-5701: calling merge without any shuffle deps does nothing
+ val tm2 = new TaskMetrics
+ tm2.mergeShuffleReadMetrics()
+ assert(tm2.shuffleReadMetrics.isEmpty)
+ }
+
+ test("register multiple shuffle write metrics") {
+ val tm = new TaskMetrics
+ val sw1 = tm.registerShuffleWriteMetrics()
+ val sw2 = tm.registerShuffleWriteMetrics()
+ assert(sw1 === sw2)
+ assert(tm.shuffleWriteMetrics === Some(sw1))
+ }
+
+ test("register multiple input metrics") {
+ val tm = new TaskMetrics
+ val im1 = tm.registerInputMetrics(DataReadMethod.Memory)
+ val im2 = tm.registerInputMetrics(DataReadMethod.Memory)
+ // input metrics with a different read method than the one already registered are ignored
+ val im3 = tm.registerInputMetrics(DataReadMethod.Hadoop)
+ assert(im1 === im2)
+ assert(im1 !== im3)
+ assert(tm.inputMetrics === Some(im1))
+ im2.setBytesRead(50L)
+ im3.setBytesRead(100L)
+ assert(tm.inputMetrics.get.bytesRead === 50L)
+ }
+
+ test("register multiple output metrics") {
+ val tm = new TaskMetrics
+ val om1 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
+ val om2 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
+ assert(om1 === om2)
+ assert(tm.outputMetrics === Some(om1))
+ }
+
+ test("additional accumulables") {
+ val internalAccums = InternalAccumulator.create()
+ val tm = new TaskMetrics(internalAccums)
+ assert(tm.accumulatorUpdates().size === internalAccums.size)
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
+ val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
+ val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
+ val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"),
+ internal = true, countFailedValues = true)
+ tm.registerAccumulator(acc1)
+ tm.registerAccumulator(acc2)
+ tm.registerAccumulator(acc3)
+ tm.registerAccumulator(acc4)
+ acc1 += 1
+ acc2 += 2
+ val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap
+ assert(newUpdates.contains(acc1.id))
+ assert(newUpdates.contains(acc2.id))
+ assert(newUpdates.contains(acc3.id))
+ assert(newUpdates.contains(acc4.id))
+ assert(newUpdates(acc1.id).name === Some("a"))
+ assert(newUpdates(acc2.id).name === Some("b"))
+ assert(newUpdates(acc3.id).name === Some("c"))
+ assert(newUpdates(acc4.id).name === Some("d"))
+ assert(newUpdates(acc1.id).update === Some(1))
+ assert(newUpdates(acc2.id).update === Some(2))
+ assert(newUpdates(acc3.id).update === Some(0))
+ assert(newUpdates(acc4.id).update === Some(0))
+ assert(!newUpdates(acc3.id).internal)
+ assert(!newUpdates(acc3.id).countFailedValues)
+ assert(newUpdates(acc4.id).internal)
+ assert(newUpdates(acc4.id).countFailedValues)
+ assert(newUpdates.values.map(_.update).forall(_.isDefined))
+ assert(newUpdates.values.map(_.value).forall(_.isEmpty))
+ assert(newUpdates.size === internalAccums.size + 4)
+ }
+
+ test("existing values in shuffle read accums") {
+ // set shuffle read accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
+ assert(srAccum.isDefined)
+ srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm = new TaskMetrics(accums)
+ assert(tm.shuffleReadMetrics.isDefined)
+ assert(tm.shuffleWriteMetrics.isEmpty)
+ assert(tm.inputMetrics.isEmpty)
+ assert(tm.outputMetrics.isEmpty)
+ }
+
+ test("existing values in shuffle write accums") {
+ // set shuffle write accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
+ assert(swAccum.isDefined)
+ swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm = new TaskMetrics(accums)
+ assert(tm.shuffleReadMetrics.isEmpty)
+ assert(tm.shuffleWriteMetrics.isDefined)
+ assert(tm.inputMetrics.isEmpty)
+ assert(tm.outputMetrics.isEmpty)
+ }
+
+ test("existing values in input accums") {
+ // set input accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
+ assert(inAccum.isDefined)
+ inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm = new TaskMetrics(accums)
+ assert(tm.shuffleReadMetrics.isEmpty)
+ assert(tm.shuffleWriteMetrics.isEmpty)
+ assert(tm.inputMetrics.isDefined)
+ assert(tm.outputMetrics.isEmpty)
}
+
+ test("existing values in output accums") {
+ // set output accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
+ assert(outAccum.isDefined)
+ outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm4 = new TaskMetrics(accums)
+ assert(tm4.shuffleReadMetrics.isEmpty)
+ assert(tm4.shuffleWriteMetrics.isEmpty)
+ assert(tm4.inputMetrics.isEmpty)
+ assert(tm4.outputMetrics.isDefined)
+ }
+
+ test("from accumulator updates") {
+ val accumUpdates1 = InternalAccumulator.create().map { a =>
+ AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
+ }
+ val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
+ assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
+ // Test this with additional accumulators. Only the ones registered with `Accumulators`
+ // will show up in the reconstructed TaskMetrics. In practice, all accumulators created
+ // on the driver, internal or not, should be registered with `Accumulators` at some point.
+ // Here we show that reconstruction will succeed even if there are unregistered accumulators.
+ val param = IntAccumulatorParam
+ val registeredAccums = Seq(
+ new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
+ new Accumulator(0, param, Some("b"), internal = true, countFailedValues = false),
+ new Accumulator(0, param, Some("c"), internal = false, countFailedValues = true),
+ new Accumulator(0, param, Some("d"), internal = false, countFailedValues = false))
+ val unregisteredAccums = Seq(
+ new Accumulator(0, param, Some("e"), internal = true, countFailedValues = true),
+ new Accumulator(0, param, Some("f"), internal = true, countFailedValues = false))
+ registeredAccums.foreach(Accumulators.register)
+ registeredAccums.foreach { a => assert(Accumulators.originals.contains(a.id)) }
+ unregisteredAccums.foreach { a => assert(!Accumulators.originals.contains(a.id)) }
+ // set some values in these accums
+ registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
+ unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
+ val registeredAccumInfos = registeredAccums.map(makeInfo)
+ val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
+ val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
+ val metrics2 = TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
+ // accumulators that were not registered with `Accumulators` will not show up
+ assertUpdatesEquals(metrics2.accumulatorUpdates(), accumUpdates1 ++ registeredAccumInfos)
+ }
+}
+
+
+private[spark] object TaskMetricsSuite extends Assertions {
+
+ /**
+ * Assert that the following three things are equal to `value`:
+ * (1) TaskMetrics value
+ * (2) TaskMetrics accumulator update value
+ * (3) Original accumulator value
+ */
+ def assertValueEquals(
+ tm: TaskMetrics,
+ tmValue: TaskMetrics => Any,
+ accums: Seq[Accumulator[_]],
+ metricName: String,
+ value: Any,
+ assertEquals: (Any, Any) => Unit = (x: Any, y: Any) => assert(x === y)): Unit = {
+ assertEquals(tmValue(tm), value)
+ val accum = accums.find(_.name == Some(metricName))
+ assert(accum.isDefined)
+ assertEquals(accum.get.value, value)
+ val accumUpdate = tm.accumulatorUpdates().find(_.name == Some(metricName))
+ assert(accumUpdate.isDefined)
+ assert(accumUpdate.get.value === None)
+ assertEquals(accumUpdate.get.update, Some(value))
+ }
+
+ /**
+ * Assert that two lists of accumulator updates are equal.
+ * Note: this does NOT check accumulator ID equality.
+ */
+ def assertUpdatesEquals(
+ updates1: Seq[AccumulableInfo],
+ updates2: Seq[AccumulableInfo]): Unit = {
+ assert(updates1.size === updates2.size)
+ updates1.zip(updates2).foreach { case (info1, info2) =>
+ // do not assert ID equals here
+ assert(info1.name === info2.name)
+ assert(info1.update === info2.update)
+ assert(info1.value === info2.value)
+ assert(info1.internal === info2.internal)
+ assert(info1.countFailedValues === info2.countFailedValues)
+ }
+ }
+
+ /**
+ * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
+ * info as an accumulator update.
+ */
+ def makeInfo(a: Accumulable[_, _]): AccumulableInfo = {
+ new AccumulableInfo(a.id, a.name, Some(a.value), None, a.isInternal, a.countFailedValues)
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
index 0e60cc8e77..2b5e4b80e9 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
@@ -31,7 +31,6 @@ object MemoryTestingUtils {
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
- metricsSystem = env.metricsSystem,
- internalAccumulators = Seq.empty)
+ metricsSystem = env.metricsSystem)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 370a284d29..d9c71ec2ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
-import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
@@ -96,8 +95,7 @@ class MyRDD(
class DAGSchedulerSuiteDummyException extends Exception
-class DAGSchedulerSuite
- extends SparkFunSuite with BeforeAndAfter with LocalSparkContext with Timeouts {
+class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts {
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
@@ -111,8 +109,10 @@ class DAGSchedulerSuite
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
- override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean = true
+ override def executorHeartbeatReceived(
+ execId: String,
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
@@ -189,7 +189,8 @@ class DAGSchedulerSuite
override def jobFailed(exception: Exception): Unit = { failure = exception }
}
- before {
+ override def beforeEach(): Unit = {
+ super.beforeEach()
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
@@ -202,17 +203,21 @@ class DAGSchedulerSuite
results.clear()
mapOutputTracker = new MapOutputTrackerMaster(conf)
scheduler = new DAGScheduler(
- sc,
- taskScheduler,
- sc.listenerBus,
- mapOutputTracker,
- blockManagerMaster,
- sc.env)
+ sc,
+ taskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
}
- after {
- scheduler.stop()
+ override def afterEach(): Unit = {
+ try {
+ scheduler.stop()
+ } finally {
+ super.afterEach()
+ }
}
override def afterAll() {
@@ -242,26 +247,31 @@ class DAGSchedulerSuite
* directly through CompletionEvents.
*/
private val jobComputeFunc = (context: TaskContext, it: Iterator[(_)]) =>
- it.next.asInstanceOf[Tuple2[_, _]]._1
+ it.next.asInstanceOf[Tuple2[_, _]]._1
/** Send the given CompletionEvent messages for the tasks in the TaskSet. */
private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(
- taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2))
}
}
}
- private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
- results: Seq[(TaskEndReason, Any)]) {
+ private def completeWithAccumulator(
+ accumId: Long,
+ taskSet: TaskSet,
+ results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
- Map[Long, Any]((accumId, 1)), createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(
+ taskSet.tasks(i),
+ result._1,
+ result._2,
+ Seq(new AccumulableInfo(
+ accumId, Some(""), Some(1), None, internal = false, countFailedValues = false))))
}
}
}
@@ -338,9 +348,12 @@ class DAGSchedulerSuite
}
test("equals and hashCode AccumulableInfo") {
- val accInfo1 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, true)
- val accInfo2 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
- val accInfo3 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
+ val accInfo1 = new AccumulableInfo(
+ 1, Some("a1"), Some("delta1"), Some("val1"), internal = true, countFailedValues = false)
+ val accInfo2 = new AccumulableInfo(
+ 1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false)
+ val accInfo3 = new AccumulableInfo(
+ 1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false)
assert(accInfo1 !== accInfo2)
assert(accInfo2 === accInfo3)
assert(accInfo2.hashCode() === accInfo3.hashCode())
@@ -464,7 +477,7 @@ class DAGSchedulerSuite
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, TaskMetrics)],
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
@@ -499,8 +512,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
complete(taskSets(1), Seq((Success, 42)))
@@ -515,12 +528,12 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
- (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
- (Success, 42),
- (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
+ (Success, 42),
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("exec-hostA")
// ask the scheduler to try it again
@@ -829,23 +842,17 @@ class DAGSchedulerSuite
HashSet("hostA", "hostB"))
// The first result task fails, with a fetch failure for the output from the first mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
@@ -882,12 +889,9 @@ class DAGSchedulerSuite
HashSet("hostA", "hostB"))
// The first result task fails, with a fetch failure for the output from the first mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))
@@ -900,12 +904,9 @@ class DAGSchedulerSuite
assert(countSubmittedMapStageAttempts() === 2)
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// Another ResubmitFailedStages event should not result in another attempt for the map
@@ -920,11 +921,11 @@ class DAGSchedulerSuite
}
/**
- * This tests the case where a late FetchFailed comes in after the map stage has finished getting
- * retried and a new reduce stage starts running.
- */
+ * This tests the case where a late FetchFailed comes in after the map stage has finished getting
+ * retried and a new reduce stage starts running.
+ */
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
- "the same stage") {
+ "the same stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
@@ -952,12 +953,9 @@ class DAGSchedulerSuite
assert(countSubmittedReduceStageAttempts() === 1)
// The first result task fails, with a fetch failure for the output from the first mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// Trigger resubmission of the failed map stage and finish the re-started map task.
@@ -971,12 +969,9 @@ class DAGSchedulerSuite
assert(countSubmittedReduceStageAttempts() === 2)
// A late FetchFailed arrives from the second task in the original reduce stage.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// Running ResubmitFailedStages shouldn't result in any more attempts for the map stage, because
@@ -1007,48 +1002,36 @@ class DAGSchedulerSuite
assert(shuffleStage.numAvailableOutputs === 0)
// should be ignored for being too old
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(0),
Success,
- makeMapStatus("hostA", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 0)
// should work because it's a non-failed host (so the available map outputs will increase)
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(0),
Success,
- makeMapStatus("hostB", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostB", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 1)
// should be ignored for being too old
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(0),
Success,
- makeMapStatus("hostA", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 1)
// should work because it's a new epoch, which will increase the number of available map
// outputs, and also finish the stage
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(1),
Success,
- makeMapStatus("hostA", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 2)
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
- HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
+ HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
// finish the next stage normally, which completes the job
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -1140,12 +1123,9 @@ class DAGSchedulerSuite
// then one executor dies, and a task fails in stage 1
runEvent(ExecutorLost("exec-hostA"))
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
- null,
- null,
- createFakeTaskInfo(),
null))
// so we resubmit stage 0, which completes happily
@@ -1155,13 +1135,10 @@ class DAGSchedulerSuite
assert(stage0Resubmit.stageAttemptId === 1)
val task = stage0Resubmit.tasks(0)
assert(task.partitionId === 2)
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
task,
Success,
- makeMapStatus("hostC", shuffleMapRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
// now here is where things get tricky : we will now have a task set representing
// the second attempt for stage 1, but we *also* have some tasks for the first attempt for
@@ -1174,28 +1151,19 @@ class DAGSchedulerSuite
// we'll have some tasks finish from the first attempt, and some finish from the second attempt,
// so that we actually have all stage outputs, though no attempt has completed all its
// tasks
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(3).tasks(0),
Success,
- makeMapStatus("hostC", reduceRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
- runEvent(CompletionEvent(
+ makeMapStatus("hostC", reduceRdd.partitions.length)))
+ runEvent(makeCompletionEvent(
taskSets(3).tasks(1),
Success,
- makeMapStatus("hostC", reduceRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostC", reduceRdd.partitions.length)))
// late task finish from the first attempt
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(2),
Success,
- makeMapStatus("hostB", reduceRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostB", reduceRdd.partitions.length)))
// What should happen now is that we submit stage 2. However, we might not see an error
// b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But
@@ -1242,21 +1210,21 @@ class DAGSchedulerSuite
submit(reduceRdd, Array(0))
// complete some of the tasks from the first stage, on one host
- runEvent(CompletionEvent(
- taskSets(0).tasks(0), Success,
- makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
- runEvent(CompletionEvent(
- taskSets(0).tasks(1), Success,
- makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(0),
+ Success,
+ makeMapStatus("hostA", reduceRdd.partitions.length)))
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(1),
+ Success,
+ makeMapStatus("hostA", reduceRdd.partitions.length)))
// now that host goes down
runEvent(ExecutorLost("exec-hostA"))
// so we resubmit those tasks
- runEvent(CompletionEvent(
- taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null))
- runEvent(CompletionEvent(
- taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
+ runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null))
// now complete everything on a different host
complete(taskSets(0), Seq(
@@ -1449,12 +1417,12 @@ class DAGSchedulerSuite
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
// have hostC complete the resubmitted task
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
- HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+ HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
@@ -1469,15 +1437,15 @@ class DAGSchedulerSuite
submit(finalRdd, Array(0))
// have the first stage complete normally
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 2)),
- (Success, makeMapStatus("hostB", 2))))
+ (Success, makeMapStatus("hostA", 2)),
+ (Success, makeMapStatus("hostB", 2))))
// have the second stage complete normally
complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostC", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
@@ -1500,15 +1468,15 @@ class DAGSchedulerSuite
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
// complete stage 0
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 2)),
- (Success, makeMapStatus("hostB", 2))))
+ (Success, makeMapStatus("hostA", 2)),
+ (Success, makeMapStatus("hostB", 2))))
// complete stage 1
complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
// pretend stage 2 failed because hostA went down
complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
@@ -1606,6 +1574,28 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ test("accumulators are updated on exception failures") {
+ val acc1 = sc.accumulator(0L, "ingenieur")
+ val acc2 = sc.accumulator(0L, "boulanger")
+ val acc3 = sc.accumulator(0L, "agriculteur")
+ assert(Accumulators.get(acc1.id).isDefined)
+ assert(Accumulators.get(acc2.id).isDefined)
+ assert(Accumulators.get(acc3.id).isDefined)
+ val accInfo1 = new AccumulableInfo(
+ acc1.id, acc1.name, Some(15L), None, internal = false, countFailedValues = false)
+ val accInfo2 = new AccumulableInfo(
+ acc2.id, acc2.name, Some(13L), None, internal = false, countFailedValues = false)
+ val accInfo3 = new AccumulableInfo(
+ acc3.id, acc3.name, Some(18L), None, internal = false, countFailedValues = false)
+ val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
+ val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
+ submit(new MyRDD(sc, 1, Nil), Array(0))
+ runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result"))
+ assert(Accumulators.get(acc1.id).get.value === 15L)
+ assert(Accumulators.get(acc2.id).get.value === 13L)
+ assert(Accumulators.get(acc3.id).get.value === 18L)
+ }
+
test("reduce tasks should be placed locally with map output") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
@@ -1614,9 +1604,9 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1))))
+ (Success, makeMapStatus("hostA", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
- HashSet(makeBlockManagerId("hostA")))
+ HashSet(makeBlockManagerId("hostA")))
// Reducer should run on the same host that map task ran
val reduceTaskSet = taskSets(1)
@@ -1884,8 +1874,7 @@ class DAGSchedulerSuite
submitMapStage(shuffleDep)
val oldTaskSet = taskSets(0)
- runEvent(CompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
// Pretend host A was lost
@@ -1895,23 +1884,19 @@ class DAGSchedulerSuite
assert(newEpoch > oldEpoch)
// Suppose we also get a completed event from task 1 on the same host; this should be ignored
- runEvent(CompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
// A completion from another task should work because it's a non-failed host
- runEvent(CompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
- runEvent(CompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
- runEvent(CompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
assert(results.size === 1) // Map stage job should now finally be complete
assertDataStructuresEmpty()
@@ -1962,5 +1947,21 @@ class DAGSchedulerSuite
info
}
-}
+ private def makeCompletionEvent(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
+ taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
+ val accumUpdates = reason match {
+ case Success =>
+ task.initialAccumulators.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(a.zero), None, a.isInternal, a.countFailedValues)
+ }
+ case ef: ExceptionFailure => ef.accumUpdates
+ case _ => Seq.empty[AccumulableInfo]
+ }
+ CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 761e82e6cf..35215c15ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
/**
* Test whether ReplayListenerBus replays events from logs correctly.
@@ -131,7 +131,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(sc.eventLogger.isDefined)
val originalEvents = sc.eventLogger.get.loggedEvents
val replayedEvents = eventMonster.loggedEvents
- originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
+ originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
+ // Don't compare the JSON here because accumulators in StageInfo may be out of order
+ JsonProtocolSuite.assertEquals(
+ JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2))
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index e5ec44a9f3..b3bb86db10 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -22,6 +22,8 @@ import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark._
+import org.apache.spark.executor.TaskMetricsSuite
+import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
@@ -57,8 +59,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
- val task = new ResultTask[String, String](
- 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, Seq.empty)
+ val task = new ResultTask[String, String](0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0)
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -97,6 +98,57 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}.collect()
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}
+
+ test("accumulators are updated on exception failures") {
+ // This means use 1 core and 4 max task failures
+ sc = new SparkContext("local[1,4]", "test")
+ val param = AccumulatorParam.LongAccumulatorParam
+ // Create 2 accumulators, one that counts failed values and another that doesn't
+ val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ // Fail first 3 attempts of every task. This means each task should be run 4 times.
+ sc.parallelize(1 to 10, 10).map { i =>
+ acc1 += 1
+ acc2 += 1
+ if (TaskContext.get.attemptNumber() <= 2) {
+ throw new Exception("you did something wrong")
+ } else {
+ 0
+ }
+ }.count()
+ // The one that counts failed values should be 4x the one that didn't,
+ // since we ran each task 4 times
+ assert(Accumulators.get(acc1.id).get.value === 40L)
+ assert(Accumulators.get(acc2.id).get.value === 10L)
+ }
+
+ test("failed tasks collect only accumulators whose values count during failures") {
+ sc = new SparkContext("local", "test")
+ val param = AccumulatorParam.LongAccumulatorParam
+ val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ val initialAccums = InternalAccumulator.create()
+ // Create a dummy task. We won't end up running this; we just want to collect
+ // accumulator updates from it.
+ val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
+ context = new TaskContextImpl(0, 0, 0L, 0,
+ new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
+ SparkEnv.get.metricsSystem,
+ initialAccums)
+ context.taskMetrics.registerAccumulator(acc1)
+ context.taskMetrics.registerAccumulator(acc2)
+ override def runTask(tc: TaskContext): Int = 0
+ }
+ // First, simulate task success. This should give us all the accumulators.
+ val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
+ val accumUpdates2 = (initialAccums ++ Seq(acc1, acc2)).map(TaskMetricsSuite.makeInfo)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
+ // Now, simulate task failures. This should give us only the accums that count failed values.
+ val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
+ val accumUpdates4 = (initialAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
+ }
+
}
private object TaskContextSuite {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index cc2557c2f1..b5385c11a9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -21,10 +21,15 @@ import java.io.File
import java.net.URL
import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal
+import com.google.common.util.concurrent.MoreExecutors
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers.{any, anyLong}
+import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
@@ -33,13 +38,14 @@ import org.apache.spark.storage.TaskResultBlockId
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
+
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
*
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
+private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
@@ -72,6 +78,31 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
}
}
+
+/**
+ * A [[TaskResultGetter]] that stores the [[DirectTaskResult]]s it receives from executors
+ * _before_ modifying the results in any way.
+ */
+private class MyTaskResultGetter(env: SparkEnv, scheduler: TaskSchedulerImpl)
+ extends TaskResultGetter(env, scheduler) {
+
+ // Use the current thread so we can access its results synchronously
+ protected override val getTaskResultExecutor = MoreExecutors.sameThreadExecutor()
+
+ // DirectTaskResults that we receive from the executors
+ private val _taskResults = new ArrayBuffer[DirectTaskResult[_]]
+
+ def taskResults: Seq[DirectTaskResult[_]] = _taskResults
+
+ override def enqueueSuccessfulTask(tsm: TaskSetManager, tid: Long, data: ByteBuffer): Unit = {
+ // work on a copy since the super class still needs to use the buffer
+ val newBuffer = data.duplicate()
+ _taskResults += env.closureSerializer.newInstance().deserialize[DirectTaskResult[_]](newBuffer)
+ super.enqueueSuccessfulTask(tsm, tid, data)
+ }
+}
+
+
/**
* Tests related to handling task results (both direct and indirect).
*/
@@ -182,5 +213,39 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
+
+ test("task result size is set on the driver, not the executors") {
+ import InternalAccumulator._
+
+ // Set up custom TaskResultGetter and TaskSchedulerImpl spy
+ sc = new SparkContext("local", "test", conf)
+ val scheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
+ val spyScheduler = spy(scheduler)
+ val resultGetter = new MyTaskResultGetter(sc.env, spyScheduler)
+ val newDAGScheduler = new DAGScheduler(sc, spyScheduler)
+ scheduler.taskResultGetter = resultGetter
+ sc.dagScheduler = newDAGScheduler
+ sc.taskScheduler = spyScheduler
+ sc.taskScheduler.setDAGScheduler(newDAGScheduler)
+
+ // Just run 1 task and capture the corresponding DirectTaskResult
+ sc.parallelize(1 to 1, 1).count()
+ val captor = ArgumentCaptor.forClass(classOf[DirectTaskResult[_]])
+ verify(spyScheduler, times(1)).handleSuccessfulTask(any(), anyLong(), captor.capture())
+
+ // When a task finishes, the executor sends a serialized DirectTaskResult to the driver
+ // without setting the result size so as to avoid serializing the result again. Instead,
+ // the result size is set later in TaskResultGetter on the driver before passing the
+ // DirectTaskResult on to TaskSchedulerImpl. In this test, we capture the DirectTaskResult
+ // before and after the result size is set.
+ assert(resultGetter.taskResults.size === 1)
+ val resBefore = resultGetter.taskResults.head
+ val resAfter = captor.getValue
+ val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+ val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+ assert(resSizeBefore.exists(_ == 0L))
+ assert(resSizeAfter.exists(_.toString.toLong > 0L))
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ecc18fc6e1..a2e7436564 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ManualClock
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
@@ -38,9 +37,8 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) {
+ accumUpdates: Seq[AccumulableInfo],
+ taskInfo: TaskInfo) {
taskScheduler.endedTasks(taskInfo.index) = reason
}
@@ -167,14 +165,17 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
+ }
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
- var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// Tell it the task has finished
- manager.handleSuccessfulTask(0, createTaskResult(0))
+ manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
assert(sched.endedTasks(0) === Success)
assert(sched.finishedManagers.contains(manager))
}
@@ -184,10 +185,15 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+ val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
+ task.initialAccumulators.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
+ }
+ }
// First three offers should all find tasks
for (i <- 0 until 3) {
- var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
@@ -198,14 +204,14 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)
// Finish the first two tasks
- manager.handleSuccessfulTask(0, createTaskResult(0))
- manager.handleSuccessfulTask(1, createTaskResult(1))
+ manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0)))
+ manager.handleSuccessfulTask(1, createTaskResult(1, accumUpdatesByTask(1)))
assert(sched.endedTasks(0) === Success)
assert(sched.endedTasks(1) === Success)
assert(!sched.finishedManagers.contains(manager))
// Finish the last task
- manager.handleSuccessfulTask(2, createTaskResult(2))
+ manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2)))
assert(sched.endedTasks(2) === Success)
assert(sched.finishedManagers.contains(manager))
}
@@ -620,7 +626,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// multiple 1k result
val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect()
- assert(10 === r.size )
+ assert(10 === r.size)
// single 10M result
val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()}
@@ -761,7 +767,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
- ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
@@ -786,8 +792,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
}
- def createTaskResult(id: Int): DirectTaskResult[Int] = {
+ private def createTaskResult(
+ id: Int,
+ accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
- new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
+ new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 86699e7f56..b83ffa3282 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.ui.scope.RDDOperationGraphListener
class StagePageSuite extends SparkFunSuite with LocalSparkContext {
+ private val peakExecutionMemory = 10
+
test("peak execution memory only displayed if unsafe is enabled") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
@@ -52,7 +54,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf(false).set(unsafeConf, "true")
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
- assert(html.contains("<td>10.0 b</td>" * 5))
+ assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
}
/**
@@ -79,14 +81,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
(1 to 2).foreach {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
- val peakExecutionMemory = 10
- taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY,
- Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
+ val taskMetrics = TaskMetrics.empty
+ taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd(
- SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
+ SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
}
jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 607617cbe9..18a16a25bf 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -240,7 +240,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0, "ignored"),
- ExceptionFailure("Exception", "description", null, null, None, None),
+ ExceptionFailure("Exception", "description", null, null, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure("0", true, Some("Induced failure")),
@@ -269,20 +269,22 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
- val taskMetrics = new TaskMetrics()
- taskMetrics.setExecutorRunTime(base + 4)
- taskMetrics.incDiskBytesSpilled(base + 5)
- taskMetrics.incMemoryBytesSpilled(base + 6)
+ val accums = InternalAccumulator.create()
+ accums.foreach(Accumulators.register)
+ val taskMetrics = new TaskMetrics(accums)
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
+ val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
+ val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
+ val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
taskMetrics.mergeShuffleReadMetrics()
- val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
shuffleWriteMetrics.incBytesWritten(base + 3)
- val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
- inputMetrics.incBytesRead(base + 7)
- val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
+ taskMetrics.setExecutorRunTime(base + 4)
+ taskMetrics.incDiskBytesSpilled(base + 5)
+ taskMetrics.incMemoryBytesSpilled(base + 6)
+ inputMetrics.setBytesRead(base + 7)
outputMetrics.setBytesWritten(base + 8)
taskMetrics
}
@@ -300,9 +302,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
- (1234L, 0, 0, makeTaskMetrics(0)),
- (1235L, 0, 0, makeTaskMetrics(100)),
- (1236L, 1, 0, makeTaskMetrics(200)))))
+ (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()),
+ (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()),
+ (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates()))))
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index e5ca2de4ad..57021d1d3d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -22,6 +22,10 @@ import java.util.Properties
import scala.collection.Map
import org.json4s.jackson.JsonMethods._
+import org.json4s.JsonAST.{JArray, JInt, JString, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest.Assertions
+import org.scalatest.exceptions.TestFailedException
import org.apache.spark._
import org.apache.spark.executor._
@@ -32,12 +36,7 @@ import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage._
class JsonProtocolSuite extends SparkFunSuite {
-
- val jobSubmissionTime = 1421191042750L
- val jobCompletionTime = 1421191296660L
-
- val executorAddedTime = 1421458410000L
- val executorRemovedTime = 1421458922000L
+ import JsonProtocolSuite._
test("SparkListenerEvent") {
val stageSubmitted =
@@ -82,9 +81,13 @@ class JsonProtocolSuite extends SparkFunSuite {
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
- val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq(
- (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
- hasHadoopInput = true, hasOutput = true))))
+ val executorMetricsUpdate = {
+ // Use custom accum ID for determinism
+ val accumUpdates =
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
+ .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) }
+ SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
+ }
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -142,7 +145,7 @@ class JsonProtocolSuite extends SparkFunSuite {
"Some exception")
val fetchMetadataFailed = new MetadataFetchFailedException(17,
19, "metadata Fetch failed exception").toTaskEndReason
- val exceptionFailure = new ExceptionFailure(exception, None)
+ val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo])
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
testTaskEndReason(fetchFailed)
@@ -166,9 +169,8 @@ class JsonProtocolSuite extends SparkFunSuite {
| Backward compatibility tests |
* ============================== */
- test("ExceptionFailure backward compatibility") {
- val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null,
- None, None)
+ test("ExceptionFailure backward compatibility: full stack trace") {
+ val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None)
val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
.removeField({ _._1 == "Full Stack Trace" })
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
@@ -273,14 +275,13 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
- test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
- // Metrics about local shuffle bytes read and local read time were added in 1.3.1.
+ test("ShuffleReadMetrics: Local bytes read backwards compatibility") {
+ // Metrics about local shuffle bytes read were added in 1.3.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
hasHadoopInput = false, hasOutput = false, hasRecords = false)
assert(metrics.shuffleReadMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
- .removeField { case (field, _) => field == "Local Read Time" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
}
@@ -371,22 +372,76 @@ class JsonProtocolSuite extends SparkFunSuite {
}
test("AccumulableInfo backward compatibility") {
- // "Internal" property of AccumulableInfo were added after 1.5.1.
- val accumulableInfo = makeAccumulableInfo(1)
+ // "Internal" property of AccumulableInfo was added in 1.5.1
+ val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
.removeField({ _._1 == "Internal" })
val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
- assert(false === oldInfo.internal)
+ assert(!oldInfo.internal)
+ // "Count Failed Values" property of AccumulableInfo was added in 2.0.0
+ val oldJson2 = JsonProtocol.accumulableInfoToJson(accumulableInfo)
+ .removeField({ _._1 == "Count Failed Values" })
+ val oldInfo2 = JsonProtocol.accumulableInfoFromJson(oldJson2)
+ assert(!oldInfo2.countFailedValues)
+ }
+
+ test("ExceptionFailure backward compatibility: accumulator updates") {
+ // "Task Metrics" was replaced with "Accumulator Updates" in 2.0.0. For older event logs,
+ // we should still be able to fallback to constructing the accumulator updates from the
+ // "Task Metrics" field, if it exists.
+ val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
+ val tmJson = JsonProtocol.taskMetricsToJson(tm)
+ val accumUpdates = tm.accumulatorUpdates()
+ val exception = new SparkException("sentimental")
+ val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
+ val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
+ val tmFieldJson: JValue = "Task Metrics" -> tmJson
+ val oldExceptionFailureJson: JValue =
+ exceptionFailureJson.removeField { _._1 == "Accumulator Updates" }.merge(tmFieldJson)
+ val oldExceptionFailure =
+ JsonProtocol.taskEndReasonFromJson(oldExceptionFailureJson).asInstanceOf[ExceptionFailure]
+ assert(exceptionFailure.className === oldExceptionFailure.className)
+ assert(exceptionFailure.description === oldExceptionFailure.description)
+ assertSeqEquals[StackTraceElement](
+ exceptionFailure.stackTrace, oldExceptionFailure.stackTrace, assertStackTraceElementEquals)
+ assert(exceptionFailure.fullStackTrace === oldExceptionFailure.fullStackTrace)
+ assertSeqEquals[AccumulableInfo](
+ exceptionFailure.accumUpdates, oldExceptionFailure.accumUpdates, (x, y) => x == y)
}
- /** -------------------------- *
- | Helper test running methods |
- * --------------------------- */
+ test("AccumulableInfo value de/serialization") {
+ import InternalAccumulator._
+ val blocks = Seq[(BlockId, BlockStatus)](
+ (TestBlockId("meebo"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+ (TestBlockId("feebo"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+ val blocksJson = JArray(blocks.toList.map { case (id, status) =>
+ ("Block ID" -> id.toString) ~
+ ("Status" -> JsonProtocol.blockStatusToJson(status))
+ })
+ testAccumValue(Some(RESULT_SIZE), 3L, JInt(3))
+ testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2))
+ testAccumValue(Some(input.READ_METHOD), "aka", JString("aka"))
+ testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson)
+ // For anything else, we just cast the value to a string
+ testAccumValue(Some("anything"), blocks, JString(blocks.toString))
+ testAccumValue(Some("anything"), 123, JString("123"))
+ }
+
+}
+
+
+private[spark] object JsonProtocolSuite extends Assertions {
+ import InternalAccumulator._
+
+ private val jobSubmissionTime = 1421191042750L
+ private val jobCompletionTime = 1421191296660L
+ private val executorAddedTime = 1421458410000L
+ private val executorRemovedTime = 1421458922000L
private def testEvent(event: SparkListenerEvent, jsonString: String) {
val actualJsonString = compact(render(JsonProtocol.sparkEventToJson(event)))
val newEvent = JsonProtocol.sparkEventFromJson(parse(actualJsonString))
- assertJsonStringEquals(jsonString, actualJsonString)
+ assertJsonStringEquals(jsonString, actualJsonString, event.getClass.getSimpleName)
assertEquals(event, newEvent)
}
@@ -440,11 +495,19 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(info, newInfo)
}
+ private def testAccumValue(name: Option[String], value: Any, expectedJson: JValue): Unit = {
+ val json = JsonProtocol.accumValueToJson(name, value)
+ assert(json === expectedJson)
+ val newValue = JsonProtocol.accumValueFromJson(name, json)
+ val expectedValue = if (name.exists(_.startsWith(METRICS_PREFIX))) value else value.toString
+ assert(newValue === expectedValue)
+ }
+
/** -------------------------------- *
| Util methods for comparing events |
- * --------------------------------- */
+ * --------------------------------- */
- private def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
+ private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
(event1, event2) match {
case (e1: SparkListenerStageSubmitted, e2: SparkListenerStageSubmitted) =>
assert(e1.properties === e2.properties)
@@ -478,14 +541,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(e1.executorId === e1.executorId)
case (e1: SparkListenerExecutorMetricsUpdate, e2: SparkListenerExecutorMetricsUpdate) =>
assert(e1.execId === e2.execId)
- assertSeqEquals[(Long, Int, Int, TaskMetrics)](e1.taskMetrics, e2.taskMetrics, (a, b) => {
- val (taskId1, stageId1, stageAttemptId1, metrics1) = a
- val (taskId2, stageId2, stageAttemptId2, metrics2) = b
- assert(taskId1 === taskId2)
- assert(stageId1 === stageId2)
- assert(stageAttemptId1 === stageAttemptId2)
- assertEquals(metrics1, metrics2)
- })
+ assertSeqEquals[(Long, Int, Int, Seq[AccumulableInfo])](
+ e1.accumUpdates,
+ e2.accumUpdates,
+ (a, b) => {
+ val (taskId1, stageId1, stageAttemptId1, updates1) = a
+ val (taskId2, stageId2, stageAttemptId2, updates2) = b
+ assert(taskId1 === taskId2)
+ assert(stageId1 === stageId2)
+ assert(stageAttemptId1 === stageAttemptId2)
+ assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b))
+ })
case (e1, e2) =>
assert(e1 === e2)
case _ => fail("Events don't match in types!")
@@ -544,7 +610,6 @@ class JsonProtocolSuite extends SparkFunSuite {
}
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
- assert(metrics1.hostname === metrics2.hostname)
assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime)
assert(metrics1.resultSize === metrics2.resultSize)
assert(metrics1.jvmGCTime === metrics2.jvmGCTime)
@@ -601,7 +666,7 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(r1.description === r2.description)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assert(r1.fullStackTrace === r2.fullStackTrace)
- assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
+ assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b))
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
@@ -637,10 +702,16 @@ class JsonProtocolSuite extends SparkFunSuite {
assertStackTraceElementEquals)
}
- private def assertJsonStringEquals(json1: String, json2: String) {
+ private def assertJsonStringEquals(expected: String, actual: String, metadata: String) {
val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
- assert(formatJsonString(json1) === formatJsonString(json2),
- s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}")
+ if (formatJsonString(expected) != formatJsonString(actual)) {
+ // scalastyle:off
+ // This prints something useful if the JSON strings don't match
+ println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n")
+ println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n")
+ // scalastyle:on
+ throw new TestFailedException(s"$metadata JSON did not equal", 1)
+ }
}
private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) {
@@ -699,7 +770,7 @@ class JsonProtocolSuite extends SparkFunSuite {
/** ----------------------------------- *
| Util methods for constructing events |
- * ------------------------------------ */
+ * ------------------------------------ */
private val properties = {
val p = new Properties
@@ -746,8 +817,12 @@ class JsonProtocolSuite extends SparkFunSuite {
taskInfo
}
- private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo =
- AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal)
+ private def makeAccumulableInfo(
+ id: Int,
+ internal: Boolean = false,
+ countFailedValues: Boolean = false): AccumulableInfo =
+ new AccumulableInfo(id, Some(s"Accumulable$id"), Some(s"delta$id"), Some(s"val$id"),
+ internal, countFailedValues)
/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
@@ -764,7 +839,6 @@ class JsonProtocolSuite extends SparkFunSuite {
hasOutput: Boolean,
hasRecords: Boolean = true) = {
val t = new TaskMetrics
- t.setHostname("localhost")
t.setExecutorDeserializeTime(a)
t.setExecutorRunTime(b)
t.setResultSize(c)
@@ -774,7 +848,7 @@ class JsonProtocolSuite extends SparkFunSuite {
if (hasHadoopInput) {
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
- inputMetrics.incBytesRead(d + e + f)
+ inputMetrics.setBytesRead(d + e + f)
inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
} else {
val sr = t.registerTempShuffleReadMetrics()
@@ -794,7 +868,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val sw = t.registerShuffleWriteMetrics()
sw.incBytesWritten(a + b + c)
sw.incWriteTime(b + c + d)
- sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+ sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
}
// Make at most 6 blocks
t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i =>
@@ -826,14 +900,16 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -881,14 +957,16 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -919,21 +997,24 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -962,21 +1043,24 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -1011,26 +1095,28 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| },
| "Task Metrics": {
- | "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Result Size": 500,
@@ -1044,7 +1130,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
| "Local Bytes Read": 1100,
- | "Total Records Read" : 10
+ | "Total Records Read": 10
| },
| "Shuffle Write Metrics": {
| "Shuffle Bytes Written": 1200,
@@ -1098,26 +1184,28 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| },
| "Task Metrics": {
- | "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Result Size": 500,
@@ -1182,26 +1270,28 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| },
| "Task Metrics": {
- | "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Result Size": 500,
@@ -1273,17 +1363,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -1331,17 +1423,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -1405,17 +1499,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -1495,17 +1591,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -1657,51 +1755,208 @@ class JsonProtocolSuite extends SparkFunSuite {
"""
private val executorMetricsUpdateJsonString =
- s"""
- |{
- | "Event": "SparkListenerExecutorMetricsUpdate",
- | "Executor ID": "exec3",
- | "Metrics Updated": [
- | {
- | "Task ID": 1,
- | "Stage ID": 2,
- | "Stage Attempt ID": 3,
- | "Task Metrics": {
- | "Host Name": "localhost",
- | "Executor Deserialize Time": 300,
- | "Executor Run Time": 400,
- | "Result Size": 500,
- | "JVM GC Time": 600,
- | "Result Serialization Time": 700,
- | "Memory Bytes Spilled": 800,
- | "Disk Bytes Spilled": 0,
- | "Input Metrics": {
- | "Data Read Method": "Hadoop",
- | "Bytes Read": 2100,
- | "Records Read": 21
- | },
- | "Output Metrics": {
- | "Data Write Method": "Hadoop",
- | "Bytes Written": 1200,
- | "Records Written": 12
- | },
- | "Updated Blocks": [
- | {
- | "Block ID": "rdd_0_0",
- | "Status": {
- | "Storage Level": {
- | "Use Disk": true,
- | "Use Memory": true,
- | "Deserialized": false,
- | "Replication": 2
- | },
- | "Memory Size": 0,
- | "Disk Size": 0
- | }
- | }
- | ]
- | }
- | }]
- |}
- """.stripMargin
+ s"""
+ |{
+ | "Event": "SparkListenerExecutorMetricsUpdate",
+ | "Executor ID": "exec3",
+ | "Metrics Updated": [
+ | {
+ | "Task ID": 1,
+ | "Stage ID": 2,
+ | "Stage Attempt ID": 3,
+ | "Accumulator Updates": [
+ | {
+ | "ID": 0,
+ | "Name": "$EXECUTOR_DESERIALIZE_TIME",
+ | "Update": 300,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 1,
+ | "Name": "$EXECUTOR_RUN_TIME",
+ | "Update": 400,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "$RESULT_SIZE",
+ | "Update": 500,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "$JVM_GC_TIME",
+ | "Update": 600,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 4,
+ | "Name": "$RESULT_SERIALIZATION_TIME",
+ | "Update": 700,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 5,
+ | "Name": "$MEMORY_BYTES_SPILLED",
+ | "Update": 800,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 6,
+ | "Name": "$DISK_BYTES_SPILLED",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 7,
+ | "Name": "$PEAK_EXECUTION_MEMORY",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 8,
+ | "Name": "$UPDATED_BLOCK_STATUSES",
+ | "Update": [
+ | {
+ | "BlockID": "rdd_0_0",
+ | "Status": {
+ | "StorageLevel": {
+ | "UseDisk": true,
+ | "UseMemory": true,
+ | "Deserialized": false,
+ | "Replication": 2
+ | },
+ | "MemorySize": 0,
+ | "DiskSize": 0
+ | }
+ | }
+ | ],
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 9,
+ | "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 10,
+ | "Name": "${shuffleRead.LOCAL_BLOCKS_FETCHED}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 11,
+ | "Name": "${shuffleRead.REMOTE_BYTES_READ}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 12,
+ | "Name": "${shuffleRead.LOCAL_BYTES_READ}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 13,
+ | "Name": "${shuffleRead.FETCH_WAIT_TIME}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 14,
+ | "Name": "${shuffleRead.RECORDS_READ}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 15,
+ | "Name": "${shuffleWrite.BYTES_WRITTEN}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 16,
+ | "Name": "${shuffleWrite.RECORDS_WRITTEN}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 17,
+ | "Name": "${shuffleWrite.WRITE_TIME}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 18,
+ | "Name": "${input.READ_METHOD}",
+ | "Update": "Hadoop",
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 19,
+ | "Name": "${input.BYTES_READ}",
+ | "Update": 2100,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 20,
+ | "Name": "${input.RECORDS_READ}",
+ | "Update": 21,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 21,
+ | "Name": "${output.WRITE_METHOD}",
+ | "Update": "Hadoop",
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 22,
+ | "Name": "${output.BYTES_WRITTEN}",
+ | "Update": 1200,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 23,
+ | "Name": "${output.RECORDS_WRITTEN}",
+ | "Update": 12,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 24,
+ | "Name": "$TEST_ACCUM",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | }
+ | ]
+ | }
+ | ]
+ |}
+ """.stripMargin
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fc7dc2181d..968a2903f3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -176,6 +176,15 @@ object MimaExcludes {
// SPARK-12510 Refactor ActorReceiver to support Java
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver")
) ++ Seq(
+ // SPARK-12895 Implement TaskMetrics using accumulators
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.internalMetricsToAccumulators"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.collectInternalAccumulators"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.collectAccumulators")
+ ) ++ Seq(
+ // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this")
+ ) ++ Seq(
// SPARK-12692 Scala style: Fix the style violation (Space before "," or ":")
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log__="),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 73dc8cb984..75cb6d1137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -79,17 +79,17 @@ case class Sort(
sorter.setTestSpillFrequency(testSpillFrequency)
}
+ val metrics = TaskContext.get().taskMetrics()
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
- val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled
+ val spillSizeBefore = metrics.memoryBytesSpilled
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
dataSize += sorter.getPeakMemoryUsage
- spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore
+ spillSize += metrics.memoryBytesSpilled - spillSizeBefore
+ metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
- TaskContext.get().internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)
sortedIterator
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 41799c596b..001e9c306a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -418,10 +418,10 @@ class TungstenAggregationIterator(
val mapMemory = hashMap.getPeakMemoryUsedBytes
val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
val peakMemory = Math.max(mapMemory, sorterMemory)
+ val metrics = TaskContext.get().taskMetrics()
dataSize += peakMemory
- spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore
- TaskContext.get().internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
+ spillSize += metrics.memoryBytesSpilled - spillSizeBefore
+ metrics.incPeakExecutionMemory(peakMemory)
}
numOutputRows += 1
res
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index 8222b84d33..edd87c2d8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -136,14 +136,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
- split.serializableHadoopSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
+ val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+
+ def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(getBytesRead())
}
}
- inputMetrics.setBytesReadCallback(bytesReadCallback)
val format = inputFormatClass.newInstance
format match {
@@ -208,6 +211,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (!finished) {
inputMetrics.incRecordsRead(1)
}
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
reader.getCurrentValue
}
@@ -228,8 +234,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
} finally {
reader = null
}
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
+ if (getBytesReadCallback.isDefined) {
+ updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index c9ea579b5e..04640711d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -111,8 +111,7 @@ case class BroadcastHashJoin(
val hashedRelation = broadcastRelation.value
hashedRelation match {
case unsafe: UnsafeHashedRelation =>
- TaskContext.get().internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+ TaskContext.get().taskMetrics().incPeakExecutionMemory(unsafe.getUnsafeSize)
case _ =>
}
hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 6c7fa2eee5..db8edd169d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -119,8 +119,7 @@ case class BroadcastHashOuterJoin(
hashTable match {
case unsafe: UnsafeHashedRelation =>
- TaskContext.get().internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+ TaskContext.get().taskMetrics().incPeakExecutionMemory(unsafe.getUnsafeSize)
case _ =>
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 004407b2e6..8929dc3af1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -66,8 +66,7 @@ case class BroadcastLeftSemiJoinHash(
val hashedRelation = broadcastedRelation.value
hashedRelation match {
case unsafe: UnsafeHashedRelation =>
- TaskContext.get().internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+ TaskContext.get().taskMetrics().incPeakExecutionMemory(unsafe.getUnsafeSize)
case _ =>
}
hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 52735c9d7f..950dc78162 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.metric
-import org.apache.spark.{Accumulable, AccumulableParam, SparkContext}
+import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
import org.apache.spark.util.Utils
/**
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
*/
private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T](
name: String, val param: SQLMetricParam[R, T])
- extends Accumulable[R, T](param.zero, param, Some(name), true) {
+ extends Accumulable[R, T](param.zero, param, Some(name), internal = true) {
def reset(): Unit = {
this.value = param.zero
@@ -131,6 +131,8 @@ private[sql] object SQLMetrics {
name: String,
param: LongSQLMetricParam): LongSQLMetric = {
val acc = new LongSQLMetric(name, param)
+ // This is an internal accumulator so we need to register it explicitly.
+ Accumulators.register(acc)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 83c64f755f..544606f116 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -139,9 +139,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
- for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) {
- updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
- finishTask = false)
+ for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) {
+ updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates, finishTask = false)
}
}
@@ -177,7 +176,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskId: Long,
stageId: Int,
stageAttemptID: Int,
- accumulatorUpdates: Map[Long, Any],
+ accumulatorUpdates: Seq[AccumulableInfo],
finishTask: Boolean): Unit = {
_stageIdToStageMetrics.get(stageId) match {
@@ -289,8 +288,10 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
for (stageId <- executionUIData.stages;
stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
- accumulatorUpdate <- taskMetrics.accumulatorUpdates.toSeq) yield {
- accumulatorUpdate
+ accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
+ assert(accumulatorUpdate.update.isDefined, s"accumulator update from " +
+ s"task did not have a partial value: ${accumulatorUpdate.name}")
+ (accumulatorUpdate.id, accumulatorUpdate.update.get)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
@@ -328,9 +329,10 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
- taskEnd.taskInfo.accumulables.map { acc =>
- (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
- }.toMap,
+ taskEnd.taskInfo.accumulables.map { a =>
+ val newValue = new LongSQLMetricValue(a.update.map(_.asInstanceOf[Long]).getOrElse(0L))
+ a.copy(update = Some(newValue))
+ },
finishTask = true)
}
@@ -406,4 +408,4 @@ private[ui] class SQLStageMetrics(
private[ui] class SQLTaskMetrics(
val attemptId: Long, // TODO not used yet
var finished: Boolean,
- var accumulatorUpdates: Map[Long, Any])
+ var accumulatorUpdates: Seq[AccumulableInfo])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 47308966e9..10ccd4b8f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1648,7 +1648,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("external sorting updates peak execution memory") {
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
- sortTest()
+ sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
index 9575d26fd1..273937fa8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -49,8 +49,7 @@ case class ReferenceSort(
val context = TaskContext.get()
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
}, preservesPartitioning = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 9c258cb31f..c7df8b51e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -71,8 +71,7 @@ class UnsafeFixedWidthAggregationMapSuite
taskAttemptId = Random.nextInt(10000),
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
- metricsSystem = null,
- internalAccumulators = Seq.empty))
+ metricsSystem = null))
try {
f
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 8a95359d9d..e03bd6a3e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -117,8 +117,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
taskAttemptId = 98456,
attemptNumber = 0,
taskMemoryManager = taskMemMgr,
- metricsSystem = null,
- internalAccumulators = Seq.empty))
+ metricsSystem = null))
val sorter = new UnsafeKVExternalSorter(
keySchema, valueSchema, SparkEnv.get.blockManager, pageSize)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index 647a7e9a4e..86c2c25c2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -17,12 +17,19 @@
package org.apache.spark.sql.execution.columnar
+import org.scalatest.BeforeAndAfterEach
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
-class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
+
+class PartitionBatchPruningSuite
+ extends SparkFunSuite
+ with BeforeAndAfterEach
+ with SharedSQLContext {
+
import testImplicits._
private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize
@@ -32,30 +39,41 @@ class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
super.beforeAll()
// Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
-
- val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
- val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
- TestData(key, string)
- }, 5).toDF()
- pruningData.registerTempTable("pruningData")
-
// Enable in-memory partition pruning
sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
// Enable in-memory table scan accumulators
sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
- sqlContext.cacheTable("pruningData")
}
override protected def afterAll(): Unit = {
try {
sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
- sqlContext.uncacheTable("pruningData")
} finally {
super.afterAll()
}
}
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ // This creates accumulators, which get cleaned up after every single test,
+ // so we need to do this before every test.
+ val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
+ val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
+ TestData(key, string)
+ }, 5).toDF()
+ pruningData.registerTempTable("pruningData")
+ sqlContext.cacheTable("pruningData")
+ }
+
+ override protected def afterEach(): Unit = {
+ try {
+ sqlContext.uncacheTable("pruningData")
+ } finally {
+ super.afterEach()
+ }
+ }
+
// Comparisons
checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1))
checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 81a159d542..2c408c8878 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.ui
import java.util.Properties
+import org.mockito.Mockito.{mock, when}
+
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
@@ -67,9 +69,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
)
private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): TaskMetrics = {
- val metrics = new TaskMetrics
- metrics.setAccumulatorsUpdater(() => accumulatorUpdates.mapValues(new LongSQLMetricValue(_)))
- metrics.updateAccumulators()
+ val metrics = mock(classOf[TaskMetrics])
+ when(metrics.accumulatorUpdates()).thenReturn(accumulatorUpdates.map { case (id, update) =>
+ new AccumulableInfo(id, Some(""), Some(new LongSQLMetricValue(update)),
+ value = None, internal = true, countFailedValues = true)
+ }.toSeq)
metrics
}
@@ -114,17 +118,17 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(listener.getExecutionMetrics(0).isEmpty)
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
- // (task id, stage id, stage attempt, metrics)
- (0L, 0, 0, createTaskMetrics(accumulatorUpdates)),
- (1L, 0, 0, createTaskMetrics(accumulatorUpdates))
+ // (task id, stage id, stage attempt, accum updates)
+ (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+ (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates())
)))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
- // (task id, stage id, stage attempt, metrics)
- (0L, 0, 0, createTaskMetrics(accumulatorUpdates)),
- (1L, 0, 0, createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)))
+ // (task id, stage id, stage attempt, accum updates)
+ (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+ (1L, 0, 0, createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulatorUpdates())
)))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
@@ -133,9 +137,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
- // (task id, stage id, stage attempt, metrics)
- (0L, 0, 1, createTaskMetrics(accumulatorUpdates)),
- (1L, 0, 1, createTaskMetrics(accumulatorUpdates))
+ // (task id, stage id, stage attempt, accum updates)
+ (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+ (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulatorUpdates())
)))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
@@ -173,9 +177,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
- // (task id, stage id, stage attempt, metrics)
- (0L, 1, 0, createTaskMetrics(accumulatorUpdates)),
- (1L, 1, 0, createTaskMetrics(accumulatorUpdates))
+ // (task id, stage id, stage attempt, accum updates)
+ (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates()),
+ (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulatorUpdates())
)))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b46b0d2f60..9a24a2487a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -140,7 +140,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
.filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
assert(peakMemoryAccumulator.size == 1)
- peakMemoryAccumulator.head._2.value.toLong
+ peakMemoryAccumulator.head._2.value.get.asInstanceOf[Long]
}
assert(sparkListener.getCompletedStageInfos.length == 2)