aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/NewAccumulator.scala391
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala107
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala228
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
30 files changed, 748 insertions, 367 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index e8f053c150..c76720c4bb 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -63,7 +63,7 @@ class Accumulable[R, T] private (
param: AccumulableParam[R, T],
name: Option[String],
countFailedValues: Boolean) = {
- this(Accumulators.newId(), initialValue, param, name, countFailedValues)
+ this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues)
}
private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
@@ -72,34 +72,23 @@ class Accumulable[R, T] private (
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
- @volatile @transient private var value_ : R = initialValue // Current value on driver
- val zero = param.zero(initialValue) // Zero value to be passed to executors
- private var deserialized = false
-
- Accumulators.register(this)
-
- /**
- * Return a copy of this [[Accumulable]].
- *
- * The copy will have the same ID as the original and will not be registered with
- * [[Accumulators]] again. This method exists so that the caller can avoid passing the
- * same mutable instance around.
- */
- private[spark] def copy(): Accumulable[R, T] = {
- new Accumulable[R, T](id, initialValue, param, name, countFailedValues)
- }
+ val zero = param.zero(initialValue)
+ private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param)
+ newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues)
+ // Register the new accumulator in ctor, to follow the previous behaviour.
+ AccumulatorContext.register(newAcc)
/**
* Add more data to this accumulator / accumulable
* @param term the data to add
*/
- def += (term: T) { value_ = param.addAccumulator(value_, term) }
+ def += (term: T) { newAcc.add(term) }
/**
* Add more data to this accumulator / accumulable
* @param term the data to add
*/
- def add(term: T) { value_ = param.addAccumulator(value_, term) }
+ def add(term: T) { newAcc.add(term) }
/**
* Merge two accumulable objects together
@@ -107,7 +96,7 @@ class Accumulable[R, T] private (
* Normally, a user will not want to use this version, but will instead call `+=`.
* @param term the other `R` that will get merged with this
*/
- def ++= (term: R) { value_ = param.addInPlace(value_, term)}
+ def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) }
/**
* Merge two accumulable objects together
@@ -115,18 +104,12 @@ class Accumulable[R, T] private (
* Normally, a user will not want to use this version, but will instead call `add`.
* @param term the other `R` that will get merged with this
*/
- def merge(term: R) { value_ = param.addInPlace(value_, term)}
+ def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) }
/**
* Access the accumulator's current value; only allowed on driver.
*/
- def value: R = {
- if (!deserialized) {
- value_
- } else {
- throw new UnsupportedOperationException("Can't read accumulator value in task")
- }
- }
+ def value: R = newAcc.value
/**
* Get the current value of this accumulator from within a task.
@@ -137,14 +120,14 @@ class Accumulable[R, T] private (
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
- def localValue: R = value_
+ def localValue: R = newAcc.localValue
/**
* Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
- if (!deserialized) {
- value_ = newValue
+ if (newAcc.isAtDriverSide) {
+ newAcc._value = newValue
} else {
throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
@@ -153,7 +136,7 @@ class Accumulable[R, T] private (
/**
* Set the accumulator's value. For internal use only.
*/
- def setValue(newValue: R): Unit = { value_ = newValue }
+ def setValue(newValue: R): Unit = { newAcc._value = newValue }
/**
* Set the accumulator's value. For internal use only.
@@ -168,22 +151,7 @@ class Accumulable[R, T] private (
new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
}
- // Called by Java when deserializing an object
- private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
- in.defaultReadObject()
- value_ = zero
- deserialized = true
-
- // Automatically register the accumulator when it is deserialized with the task closure.
- // This is for external accumulators and internal ones that do not represent task level
- // metrics, e.g. internal SQL metrics, which are per-operator.
- val taskContext = TaskContext.get()
- if (taskContext != null) {
- taskContext.registerAccumulator(this)
- }
- }
-
- override def toString: String = if (value_ == null) "null" else value_.toString
+ override def toString: String = if (newAcc._value == null) "null" else newAcc._value.toString
}
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 0c17f014c9..9b007b9776 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -68,73 +68,6 @@ class Accumulator[T] private[spark] (
extends Accumulable[T, T](initialValue, param, name, countFailedValues)
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private[spark] object Accumulators extends Logging {
- /**
- * This global map holds the original accumulator objects that are created on the driver.
- * It keeps weak references to these objects so that accumulators can be garbage-collected
- * once the RDDs and user-code that reference them are cleaned up.
- * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
- */
- @GuardedBy("Accumulators")
- val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
-
- private val nextId = new AtomicLong(0L)
-
- /**
- * Return a globally unique ID for a new [[Accumulable]].
- * Note: Once you copy the [[Accumulable]] the ID is no longer unique.
- */
- def newId(): Long = nextId.getAndIncrement
-
- /**
- * Register an [[Accumulable]] created on the driver such that it can be used on the executors.
- *
- * All accumulators registered here can later be used as a container for accumulating partial
- * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
- * Note: if an accumulator is registered here, it should also be registered with the active
- * context cleaner for cleanup so as to avoid memory leaks.
- *
- * If an [[Accumulable]] with the same ID was already registered, this does nothing instead
- * of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct
- * [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
- */
- def register(a: Accumulable[_, _]): Unit = synchronized {
- if (!originals.contains(a.id)) {
- originals(a.id) = new WeakReference[Accumulable[_, _]](a)
- }
- }
-
- /**
- * Unregister the [[Accumulable]] with the given ID, if any.
- */
- def remove(accId: Long): Unit = synchronized {
- originals.remove(accId)
- }
-
- /**
- * Return the [[Accumulable]] registered with the given ID, if any.
- */
- def get(id: Long): Option[Accumulable[_, _]] = synchronized {
- originals.get(id).map { weakRef =>
- // Since we are storing weak references, we must check whether the underlying data is valid.
- weakRef.get.getOrElse {
- throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
- }
- }
- }
-
- /**
- * Clear all registered [[Accumulable]]s. For testing only.
- */
- def clear(): Unit = synchronized {
- originals.clear()
- }
-
-}
-
-
/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 76692ccec8..63a00a84af 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
registerForCleanup(rdd, CleanRDD(rdd.id))
}
- def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+ def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
registerForCleanup(a, CleanAccum(a.id))
}
@@ -241,7 +241,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
try {
logDebug("Cleaning accumulator " + accId)
- Accumulators.remove(accId)
+ AccumulatorContext.remove(accId)
listeners.asScala.foreach(_.accumCleaned(accId))
logInfo("Cleaned accumulator " + accId)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 2bdbd3fae9..9eac05fdf9 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)
/**
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
new file mode 100644
index 0000000000..edb9b741a8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.{lang => jl}
+import java.io.ObjectInputStream
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.util.Utils
+
+
+private[spark] case class AccumulatorMetadata(
+ id: Long,
+ name: Option[String],
+ countFailedValues: Boolean) extends Serializable
+
+
+/**
+ * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of
+ * type `OUT`.
+ */
+abstract class NewAccumulator[IN, OUT] extends Serializable {
+ private[spark] var metadata: AccumulatorMetadata = _
+ private[this] var atDriverSide = true
+
+ private[spark] def register(
+ sc: SparkContext,
+ name: Option[String] = None,
+ countFailedValues: Boolean = false): Unit = {
+ if (this.metadata != null) {
+ throw new IllegalStateException("Cannot register an Accumulator twice.")
+ }
+ this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)
+ AccumulatorContext.register(this)
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
+ }
+
+ /**
+ * Returns true if this accumulator has been registered. Note that all accumulators must be
+ * registered before ues, or it will throw exception.
+ */
+ final def isRegistered: Boolean =
+ metadata != null && AccumulatorContext.originals.containsKey(metadata.id)
+
+ private def assertMetadataNotNull(): Unit = {
+ if (metadata == null) {
+ throw new IllegalAccessError("The metadata of this accumulator has not been assigned yet.")
+ }
+ }
+
+ /**
+ * Returns the id of this accumulator, can only be called after registration.
+ */
+ final def id: Long = {
+ assertMetadataNotNull()
+ metadata.id
+ }
+
+ /**
+ * Returns the name of this accumulator, can only be called after registration.
+ */
+ final def name: Option[String] = {
+ assertMetadataNotNull()
+ metadata.name
+ }
+
+ /**
+ * Whether to accumulate values from failed tasks. This is set to true for system and time
+ * metrics like serialization time or bytes spilled, and false for things with absolute values
+ * like number of input rows. This should be used for internal metrics only.
+ */
+ private[spark] final def countFailedValues: Boolean = {
+ assertMetadataNotNull()
+ metadata.countFailedValues
+ }
+
+ /**
+ * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] with the provided
+ * values.
+ */
+ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
+ val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
+ new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
+ }
+
+ final private[spark] def isAtDriverSide: Boolean = atDriverSide
+
+ /**
+ * Tells if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
+ * value; for a list accumulator, Nil is zero value.
+ */
+ def isZero(): Boolean
+
+ /**
+ * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy
+ * must return true.
+ */
+ def copyAndReset(): NewAccumulator[IN, OUT]
+
+ /**
+ * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator.
+ */
+ def add(v: IN): Unit
+
+ /**
+ * Merges another same-type accumulator into this one and update its state, i.e. this should be
+ * merge-in-place.
+ */
+ def merge(other: NewAccumulator[IN, OUT]): Unit
+
+ /**
+ * Access this accumulator's current value; only allowed on driver.
+ */
+ final def value: OUT = {
+ if (atDriverSide) {
+ localValue
+ } else {
+ throw new UnsupportedOperationException("Can't read accumulator value in task")
+ }
+ }
+
+ /**
+ * Defines the current value of this accumulator.
+ *
+ * This is NOT the global value of the accumulator. To get the global value after a
+ * completed operation on the dataset, call `value`.
+ */
+ def localValue: OUT
+
+ // Called by Java when serializing an object
+ final protected def writeReplace(): Any = {
+ if (atDriverSide) {
+ if (!isRegistered) {
+ throw new UnsupportedOperationException(
+ "Accumulator must be registered before send to executor")
+ }
+ val copy = copyAndReset()
+ assert(copy.isZero(), "copyAndReset must return a zero value copy")
+ copy.metadata = metadata
+ copy
+ } else {
+ this
+ }
+ }
+
+ // Called by Java when deserializing an object
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ in.defaultReadObject()
+ if (atDriverSide) {
+ atDriverSide = false
+
+ // Automatically register the accumulator when it is deserialized with the task closure.
+ // This is for external accumulators and internal ones that do not represent task level
+ // metrics, e.g. internal SQL metrics, which are per-operator.
+ val taskContext = TaskContext.get()
+ if (taskContext != null) {
+ taskContext.registerAccumulator(this)
+ }
+ } else {
+ atDriverSide = true
+ }
+ }
+
+ override def toString: String = {
+ if (metadata == null) {
+ "Un-registered Accumulator: " + getClass.getSimpleName
+ } else {
+ getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
+ }
+ }
+}
+
+
+private[spark] object AccumulatorContext {
+
+ /**
+ * This global map holds the original accumulator objects that are created on the driver.
+ * It keeps weak references to these objects so that accumulators can be garbage-collected
+ * once the RDDs and user-code that reference them are cleaned up.
+ * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
+ */
+ @GuardedBy("AccumulatorContext")
+ val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
+
+ private[this] val nextId = new AtomicLong(0L)
+
+ /**
+ * Return a globally unique ID for a new [[Accumulator]].
+ * Note: Once you copy the [[Accumulator]] the ID is no longer unique.
+ */
+ def newId(): Long = nextId.getAndIncrement
+
+ /**
+ * Register an [[Accumulator]] created on the driver such that it can be used on the executors.
+ *
+ * All accumulators registered here can later be used as a container for accumulating partial
+ * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
+ * Note: if an accumulator is registered here, it should also be registered with the active
+ * context cleaner for cleanup so as to avoid memory leaks.
+ *
+ * If an [[Accumulator]] with the same ID was already registered, this does nothing instead
+ * of overwriting it. We will never register same accumulator twice, this is just a sanity check.
+ */
+ def register(a: NewAccumulator[_, _]): Unit = synchronized {
+ if (!originals.containsKey(a.id)) {
+ originals.put(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a))
+ }
+ }
+
+ /**
+ * Unregister the [[Accumulator]] with the given ID, if any.
+ */
+ def remove(id: Long): Unit = synchronized {
+ originals.remove(id)
+ }
+
+ /**
+ * Return the [[Accumulator]] registered with the given ID, if any.
+ */
+ def get(id: Long): Option[NewAccumulator[_, _]] = synchronized {
+ Option(originals.get(id)).map { ref =>
+ // Since we are storing weak references, we must check whether the underlying data is valid.
+ val acc = ref.get
+ if (acc eq null) {
+ throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
+ }
+ acc
+ }
+ }
+
+ /**
+ * Clear all registered [[Accumulator]]s. For testing only.
+ */
+ def clear(): Unit = synchronized {
+ originals.clear()
+ }
+}
+
+
+class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
+ private[this] var _sum = 0L
+
+ override def isZero(): Boolean = _sum == 0
+
+ override def copyAndReset(): LongAccumulator = new LongAccumulator
+
+ override def add(v: jl.Long): Unit = _sum += v
+
+ def add(v: Long): Unit = _sum += v
+
+ def sum: Long = _sum
+
+ override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other match {
+ case o: LongAccumulator => _sum += o.sum
+ case _ => throw new UnsupportedOperationException(
+ s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+ }
+
+ private[spark] def setValue(newValue: Long): Unit = _sum = newValue
+
+ override def localValue: jl.Long = _sum
+}
+
+
+class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
+ private[this] var _sum = 0.0
+
+ override def isZero(): Boolean = _sum == 0.0
+
+ override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
+
+ override def add(v: jl.Double): Unit = _sum += v
+
+ def add(v: Double): Unit = _sum += v
+
+ def sum: Double = _sum
+
+ override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match {
+ case o: DoubleAccumulator => _sum += o.sum
+ case _ => throw new UnsupportedOperationException(
+ s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+ }
+
+ private[spark] def setValue(newValue: Double): Unit = _sum = newValue
+
+ override def localValue: jl.Double = _sum
+}
+
+
+class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
+ private[this] var _sum = 0.0
+ private[this] var _count = 0L
+
+ override def isZero(): Boolean = _sum == 0.0 && _count == 0
+
+ override def copyAndReset(): AverageAccumulator = new AverageAccumulator
+
+ override def add(v: jl.Double): Unit = {
+ _sum += v
+ _count += 1
+ }
+
+ def add(d: Double): Unit = {
+ _sum += d
+ _count += 1
+ }
+
+ override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match {
+ case o: AverageAccumulator =>
+ _sum += o.sum
+ _count += o.count
+ case _ => throw new UnsupportedOperationException(
+ s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+ }
+
+ override def localValue: jl.Double = if (_count == 0) {
+ Double.NaN
+ } else {
+ _sum / _count
+ }
+
+ def sum: Double = _sum
+
+ def count: Long = _count
+}
+
+
+class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
+ private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
+
+ override def isZero(): Boolean = _list.isEmpty
+
+ override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
+
+ override def add(v: T): Unit = _list.add(v)
+
+ override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = other match {
+ case o: ListAccumulator[T] => _list.addAll(o.localValue)
+ case _ => throw new UnsupportedOperationException(
+ s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+ }
+
+ override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
+
+ private[spark] def setValue(newValue: java.util.List[T]): Unit = {
+ _list.clear()
+ _list.addAll(newValue)
+ }
+}
+
+
+class LegacyAccumulatorWrapper[R, T](
+ initialValue: R,
+ param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, R] {
+ private[spark] var _value = initialValue // Current value on driver
+
+ override def isZero(): Boolean = _value == param.zero(initialValue)
+
+ override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
+ val acc = new LegacyAccumulatorWrapper(initialValue, param)
+ acc._value = param.zero(initialValue)
+ acc
+ }
+
+ override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
+
+ override def merge(other: NewAccumulator[T, R]): Unit = other match {
+ case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue)
+ case _ => throw new UnsupportedOperationException(
+ s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+ }
+
+ override def localValue: R = _value
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f322a770bf..865989aee0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1217,10 +1217,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
- def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] =
- {
+ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = {
val acc = new Accumulator(initialValue, param)
- cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+ cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
acc
}
@@ -1232,7 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
: Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name))
- cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+ cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
acc
}
@@ -1245,7 +1244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param)
- cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+ cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
acc
}
@@ -1259,7 +1258,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param, Some(name))
- cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+ cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
acc
}
@@ -1273,7 +1272,101 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R, T]
val acc = new Accumulable(initialValue, param)
- cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+ cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
+ acc
+ }
+
+ /**
+ * Register the given accumulator. Note that accumulators must be registered before use, or it
+ * will throw exception.
+ */
+ def register(acc: NewAccumulator[_, _]): Unit = {
+ acc.register(this)
+ }
+
+ /**
+ * Register the given accumulator with given name. Note that accumulators must be registered
+ * before use, or it will throw exception.
+ */
+ def register(acc: NewAccumulator[_, _], name: String): Unit = {
+ acc.register(this, name = Some(name))
+ }
+
+ /**
+ * Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`.
+ */
+ def longAccumulator: LongAccumulator = {
+ val acc = new LongAccumulator
+ register(acc)
+ acc
+ }
+
+ /**
+ * Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`.
+ */
+ def longAccumulator(name: String): LongAccumulator = {
+ val acc = new LongAccumulator
+ register(acc, name)
+ acc
+ }
+
+ /**
+ * Create and register a double accumulator, which starts with 0 and accumulates inputs by `+=`.
+ */
+ def doubleAccumulator: DoubleAccumulator = {
+ val acc = new DoubleAccumulator
+ register(acc)
+ acc
+ }
+
+ /**
+ * Create and register a double accumulator, which starts with 0 and accumulates inputs by `+=`.
+ */
+ def doubleAccumulator(name: String): DoubleAccumulator = {
+ val acc = new DoubleAccumulator
+ register(acc, name)
+ acc
+ }
+
+ /**
+ * Create and register an average accumulator, which accumulates double inputs by recording the
+ * total sum and total count, and produce the output by sum / total. Note that Double.NaN will be
+ * returned if no input is added.
+ */
+ def averageAccumulator: AverageAccumulator = {
+ val acc = new AverageAccumulator
+ register(acc)
+ acc
+ }
+
+ /**
+ * Create and register an average accumulator, which accumulates double inputs by recording the
+ * total sum and total count, and produce the output by sum / total. Note that Double.NaN will be
+ * returned if no input is added.
+ */
+ def averageAccumulator(name: String): AverageAccumulator = {
+ val acc = new AverageAccumulator
+ register(acc, name)
+ acc
+ }
+
+ /**
+ * Create and register a list accumulator, which starts with empty list and accumulates inputs
+ * by adding them into the inner list.
+ */
+ def listAccumulator[T]: ListAccumulator[T] = {
+ val acc = new ListAccumulator[T]
+ register(acc)
+ acc
+ }
+
+ /**
+ * Create and register a list accumulator, which starts with empty list and accumulates inputs
+ * by adding them into the inner list.
+ */
+ def listAccumulator[T](name: String): ListAccumulator[T] = {
+ val acc = new ListAccumulator[T]
+ register(acc, name)
acc
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index e7940bd9ed..9e53257462 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable {
* Register an accumulator that belongs to this task. Accumulators must call this method when
* deserializing in executors.
*/
- private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit
+ private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 43e555670d..bc3807f5db 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -122,7 +122,7 @@ private[spark] class TaskContextImpl(
override def getMetricsSources(sourceName: String): Seq[Source] =
metricsSystem.getSourcesByName(sourceName)
- private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
taskMetrics.registerAccumulator(a)
}
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 7487cfe9c5..82ba2d0c27 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -19,10 +19,7 @@ package org.apache.spark
import java.io.{ObjectInputStream, ObjectOutputStream}
-import scala.util.Try
-
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
@@ -120,18 +117,10 @@ case class ExceptionFailure(
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
private val exceptionWrapper: Option[ThrowableSerializationWrapper],
- accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
+ accumUpdates: Seq[AccumulableInfo] = Seq.empty,
+ private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil)
extends TaskFailedReason {
- @deprecated("use accumUpdates instead", "2.0.0")
- val metrics: Option[TaskMetrics] = {
- if (accumUpdates.nonEmpty) {
- Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption
- } else {
- None
- }
- }
-
/**
* `preserveCause` is used to keep the exception itself so it is available to the
* driver. This may be set to `false` in the event that the exception is not in fact
@@ -149,6 +138,11 @@ case class ExceptionFailure(
this(e, accumUpdates, preserveCause = true)
}
+ private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): ExceptionFailure = {
+ this.accums = accums
+ this
+ }
+
def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))
override def toErrorString: String =
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 650f05c309..4d61f7e232 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -353,22 +353,24 @@ private[spark] class Executor(
logError(s"Exception in $taskName (TID $taskId)", t)
// Collect latest accumulator values to report back to the driver
- val accumulatorUpdates: Seq[AccumulableInfo] =
+ val accums: Seq[NewAccumulator[_, _]] =
if (task != null) {
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
- Seq.empty[AccumulableInfo]
+ Seq.empty
}
+ val accUpdates = accums.map(acc => acc.toInfo(Some(acc.localValue), None))
+
val serializedTaskEndReason = {
try {
- ser.serialize(new ExceptionFailure(t, accumulatorUpdates))
+ ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
- ser.serialize(new ExceptionFailure(t, accumulatorUpdates, preserveCause = false))
+ ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
}
}
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
@@ -476,14 +478,14 @@ private[spark] class Executor(
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
// list of (task id, accumUpdates) to send back to the driver
- val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulableInfo])]()
+ val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]()
val curGCTime = computeTotalGcTime()
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
- accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates()))
+ accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 535352e7dd..6f7160ac0d 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -40,20 +40,18 @@ object DataReadMethod extends Enumeration with Serializable {
*/
@DeveloperApi
class InputMetrics private[spark] () extends Serializable {
- import InternalAccumulator._
-
- private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ)
- private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ)
+ private[executor] val _bytesRead = new LongAccumulator
+ private[executor] val _recordsRead = new LongAccumulator
/**
* Total number of bytes read.
*/
- def bytesRead: Long = _bytesRead.localValue
+ def bytesRead: Long = _bytesRead.sum
/**
* Total number of records read.
*/
- def recordsRead: Long = _recordsRead.localValue
+ def recordsRead: Long = _recordsRead.sum
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 586c98b156..db3924cb69 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -39,20 +39,18 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
class OutputMetrics private[spark] () extends Serializable {
- import InternalAccumulator._
-
- private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
- private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)
+ private[executor] val _bytesWritten = new LongAccumulator
+ private[executor] val _recordsWritten = new LongAccumulator
/**
* Total number of bytes written.
*/
- def bytesWritten: Long = _bytesWritten.localValue
+ def bytesWritten: Long = _bytesWritten.sum
/**
* Total number of records written.
*/
- def recordsWritten: Long = _recordsWritten.localValue
+ def recordsWritten: Long = _recordsWritten.sum
private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index f012a74db6..fa962108c3 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -28,52 +28,44 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class ShuffleReadMetrics private[spark] () extends Serializable {
- import InternalAccumulator._
-
- private[executor] val _remoteBlocksFetched =
- TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED)
- private[executor] val _localBlocksFetched =
- TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED)
- private[executor] val _remoteBytesRead =
- TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ)
- private[executor] val _localBytesRead =
- TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ)
- private[executor] val _fetchWaitTime =
- TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME)
- private[executor] val _recordsRead =
- TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ)
+ private[executor] val _remoteBlocksFetched = new LongAccumulator
+ private[executor] val _localBlocksFetched = new LongAccumulator
+ private[executor] val _remoteBytesRead = new LongAccumulator
+ private[executor] val _localBytesRead = new LongAccumulator
+ private[executor] val _fetchWaitTime = new LongAccumulator
+ private[executor] val _recordsRead = new LongAccumulator
/**
* Number of remote blocks fetched in this shuffle by this task.
*/
- def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue
+ def remoteBlocksFetched: Long = _remoteBlocksFetched.sum
/**
* Number of local blocks fetched in this shuffle by this task.
*/
- def localBlocksFetched: Int = _localBlocksFetched.localValue
+ def localBlocksFetched: Long = _localBlocksFetched.sum
/**
* Total number of remote bytes read from the shuffle by this task.
*/
- def remoteBytesRead: Long = _remoteBytesRead.localValue
+ def remoteBytesRead: Long = _remoteBytesRead.sum
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
- def localBytesRead: Long = _localBytesRead.localValue
+ def localBytesRead: Long = _localBytesRead.sum
/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.
*/
- def fetchWaitTime: Long = _fetchWaitTime.localValue
+ def fetchWaitTime: Long = _fetchWaitTime.sum
/**
* Total number of records read from the shuffle by this task.
*/
- def recordsRead: Long = _recordsRead.localValue
+ def recordsRead: Long = _recordsRead.sum
/**
* Total bytes fetched in the shuffle by this task (both remote and local).
@@ -83,10 +75,10 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
/**
* Number of blocks fetched in this shuffle by this task (remote or local).
*/
- def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
+ def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
- private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
- private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
+ private[spark] def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched.add(v)
+ private[spark] def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
@@ -104,12 +96,12 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
* [[TempShuffleReadMetrics]] into `this`.
*/
private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
- _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
- _localBlocksFetched.setValue(_localBlocksFetched.zero)
- _remoteBytesRead.setValue(_remoteBytesRead.zero)
- _localBytesRead.setValue(_localBytesRead.zero)
- _fetchWaitTime.setValue(_fetchWaitTime.zero)
- _recordsRead.setValue(_recordsRead.zero)
+ _remoteBlocksFetched.setValue(0)
+ _localBlocksFetched.setValue(0)
+ _remoteBytesRead.setValue(0)
+ _localBytesRead.setValue(0)
+ _fetchWaitTime.setValue(0)
+ _recordsRead.setValue(0)
metrics.foreach { metric =>
_remoteBlocksFetched.add(metric.remoteBlocksFetched)
_localBlocksFetched.add(metric.localBlocksFetched)
@@ -127,22 +119,22 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
* last.
*/
private[spark] class TempShuffleReadMetrics {
- private[this] var _remoteBlocksFetched = 0
- private[this] var _localBlocksFetched = 0
+ private[this] var _remoteBlocksFetched = 0L
+ private[this] var _localBlocksFetched = 0L
private[this] var _remoteBytesRead = 0L
private[this] var _localBytesRead = 0L
private[this] var _fetchWaitTime = 0L
private[this] var _recordsRead = 0L
- def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v
- def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v
+ def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
+ def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
def incRecordsRead(v: Long): Unit = _recordsRead += v
- def remoteBlocksFetched: Int = _remoteBlocksFetched
- def localBlocksFetched: Int = _localBlocksFetched
+ def remoteBlocksFetched: Long = _remoteBlocksFetched
+ def localBlocksFetched: Long = _localBlocksFetched
def remoteBytesRead: Long = _remoteBytesRead
def localBytesRead: Long = _localBytesRead
def fetchWaitTime: Long = _fetchWaitTime
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 7326fba841..0e70a4f522 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -28,29 +28,24 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class ShuffleWriteMetrics private[spark] () extends Serializable {
- import InternalAccumulator._
-
- private[executor] val _bytesWritten =
- TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN)
- private[executor] val _recordsWritten =
- TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN)
- private[executor] val _writeTime =
- TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME)
+ private[executor] val _bytesWritten = new LongAccumulator
+ private[executor] val _recordsWritten = new LongAccumulator
+ private[executor] val _writeTime = new LongAccumulator
/**
* Number of bytes written for the shuffle by this task.
*/
- def bytesWritten: Long = _bytesWritten.localValue
+ def bytesWritten: Long = _bytesWritten.sum
/**
* Total number of records written to the shuffle by this task.
*/
- def recordsWritten: Long = _recordsWritten.localValue
+ def recordsWritten: Long = _recordsWritten.sum
/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds.
*/
- def writeTime: Long = _writeTime.localValue
+ def writeTime: Long = _writeTime.sum
private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 8513d053f2..0b64917219 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,10 +17,9 @@
package org.apache.spark.executor
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import org.apache.spark._
-import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
@@ -42,53 +41,51 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
*/
@DeveloperApi
class TaskMetrics private[spark] () extends Serializable {
- import InternalAccumulator._
-
// Each metric is internally represented as an accumulator
- private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME)
- private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME)
- private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE)
- private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME)
- private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME)
- private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED)
- private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED)
- private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY)
- private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES)
+ private val _executorDeserializeTime = new LongAccumulator
+ private val _executorRunTime = new LongAccumulator
+ private val _resultSize = new LongAccumulator
+ private val _jvmGCTime = new LongAccumulator
+ private val _resultSerializationTime = new LongAccumulator
+ private val _memoryBytesSpilled = new LongAccumulator
+ private val _diskBytesSpilled = new LongAccumulator
+ private val _peakExecutionMemory = new LongAccumulator
+ private val _updatedBlockStatuses = new BlockStatusesAccumulator
/**
* Time taken on the executor to deserialize this task.
*/
- def executorDeserializeTime: Long = _executorDeserializeTime.localValue
+ def executorDeserializeTime: Long = _executorDeserializeTime.sum
/**
* Time the executor spends actually running the task (including fetching shuffle data).
*/
- def executorRunTime: Long = _executorRunTime.localValue
+ def executorRunTime: Long = _executorRunTime.sum
/**
* The number of bytes this task transmitted back to the driver as the TaskResult.
*/
- def resultSize: Long = _resultSize.localValue
+ def resultSize: Long = _resultSize.sum
/**
* Amount of time the JVM spent in garbage collection while executing this task.
*/
- def jvmGCTime: Long = _jvmGCTime.localValue
+ def jvmGCTime: Long = _jvmGCTime.sum
/**
* Amount of time spent serializing the task result.
*/
- def resultSerializationTime: Long = _resultSerializationTime.localValue
+ def resultSerializationTime: Long = _resultSerializationTime.sum
/**
* The number of in-memory bytes spilled by this task.
*/
- def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue
+ def memoryBytesSpilled: Long = _memoryBytesSpilled.sum
/**
* The number of on-disk bytes spilled by this task.
*/
- def diskBytesSpilled: Long = _diskBytesSpilled.localValue
+ def diskBytesSpilled: Long = _diskBytesSpilled.sum
/**
* Peak memory used by internal data structures created during shuffles, aggregations and
@@ -96,7 +93,7 @@ class TaskMetrics private[spark] () extends Serializable {
* across all such data structures created in this task. For SQL jobs, this only tracks all
* unsafe operators and ExternalSort.
*/
- def peakExecutionMemory: Long = _peakExecutionMemory.localValue
+ def peakExecutionMemory: Long = _peakExecutionMemory.sum
/**
* Storage statuses of any blocks that have been updated as a result of this task.
@@ -114,7 +111,7 @@ class TaskMetrics private[spark] () extends Serializable {
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
- private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
+ private[spark] def incUpdatedBlockStatuses(v: (BlockId, BlockStatus)): Unit =
_updatedBlockStatuses.add(v)
private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
_updatedBlockStatuses.setValue(v)
@@ -175,124 +172,143 @@ class TaskMetrics private[spark] () extends Serializable {
}
// Only used for test
- private[spark] val testAccum =
- sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM))
-
- @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = {
- val in = inputMetrics
- val out = outputMetrics
- val sr = shuffleReadMetrics
- val sw = shuffleWriteMetrics
- Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime,
- _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, _peakExecutionMemory,
- _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, sr._remoteBytesRead,
- sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, sw._bytesWritten, sw._recordsWritten,
- sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, out._recordsWritten) ++
- testAccum
- }
+ private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator)
+
+
+ import InternalAccumulator._
+ @transient private[spark] lazy val nameToAccums = LinkedHashMap(
+ EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
+ EXECUTOR_RUN_TIME -> _executorRunTime,
+ RESULT_SIZE -> _resultSize,
+ JVM_GC_TIME -> _jvmGCTime,
+ RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
+ MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
+ DISK_BYTES_SPILLED -> _diskBytesSpilled,
+ PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
+ UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
+ shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
+ shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
+ shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
+ shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
+ shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
+ shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
+ shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
+ shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
+ shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
+ input.BYTES_READ -> inputMetrics._bytesRead,
+ input.RECORDS_READ -> inputMetrics._recordsRead,
+ output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
+ output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
+ ) ++ testAccum.map(TEST_ACCUM -> _)
+
+ @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] =
+ nameToAccums.values.toIndexedSeq
/* ========================== *
| OTHER THINGS |
* ========================== */
- private[spark] def registerForCleanup(sc: SparkContext): Unit = {
- internalAccums.foreach { accum =>
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
+ private[spark] def register(sc: SparkContext): Unit = {
+ nameToAccums.foreach {
+ case (name, acc) => acc.register(sc, name = Some(name), countFailedValues = true)
}
}
/**
* External accumulators registered with this task.
*/
- @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, _]]
+ @transient private lazy val externalAccums = new ArrayBuffer[NewAccumulator[_, _]]
- private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
externalAccums += a
}
- /**
- * Return the latest updates of accumulators in this task.
- *
- * The [[AccumulableInfo.update]] field is always defined and the [[AccumulableInfo.value]]
- * field is always empty, since this represents the partial updates recorded in this task,
- * not the aggregated value across multiple tasks.
- */
- def accumulatorUpdates(): Seq[AccumulableInfo] = {
- (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), None) }
- }
+ private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = internalAccums ++ externalAccums
}
-/**
- * Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners.
- * Its purpose is to obviate the need for the driver to reconstruct the original accumulators,
- * which might have been garbage-collected. See SPARK-13407 for more details.
- *
- * Instances of this class should be considered read-only and users should not call `inc*()` or
- * `set*()` methods. While we could override the setter methods to throw
- * UnsupportedOperationException, we choose not to do so because the overrides would quickly become
- * out-of-date when new metrics are added.
- */
-private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics {
-
- override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
-
- override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
- throw new UnsupportedOperationException("This TaskMetrics is read-only")
- }
-}
private[spark] object TaskMetrics extends Logging {
+ import InternalAccumulator._
/**
* Create an empty task metrics that doesn't register its accumulators.
*/
def empty: TaskMetrics = {
- val metrics = new TaskMetrics
- metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
- metrics
+ val tm = new TaskMetrics
+ tm.nameToAccums.foreach { case (name, acc) =>
+ acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), true)
+ }
+ tm
+ }
+
+ def registered: TaskMetrics = {
+ val tm = empty
+ tm.internalAccums.foreach(AccumulatorContext.register)
+ tm
}
/**
- * Create a new accumulator representing an internal task metric.
+ * Construct a [[TaskMetrics]] object from a list of [[AccumulableInfo]], called on driver only.
+ * The returned [[TaskMetrics]] is only used to get some internal metrics, we don't need to take
+ * care of external accumulator info passed in.
*/
- private def newMetric[T](
- initialValue: T,
- name: String,
- param: AccumulatorParam[T]): Accumulator[T] = {
- new Accumulator[T](initialValue, param, Some(name), countFailedValues = true)
+ def fromAccumulatorInfos(infos: Seq[AccumulableInfo]): TaskMetrics = {
+ val tm = new TaskMetrics
+ infos.filter(info => info.name.isDefined && info.update.isDefined).foreach { info =>
+ val name = info.name.get
+ val value = info.update.get
+ if (name == UPDATED_BLOCK_STATUSES) {
+ tm.setUpdatedBlockStatuses(value.asInstanceOf[Seq[(BlockId, BlockStatus)]])
+ } else {
+ tm.nameToAccums.get(name).foreach(
+ _.asInstanceOf[LongAccumulator].setValue(value.asInstanceOf[Long])
+ )
+ }
+ }
+ tm
}
- def createLongAccum(name: String): Accumulator[Long] = {
- newMetric(0L, name, LongAccumulatorParam)
- }
+ /**
+ * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.
+ */
+ def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = {
+ val tm = new TaskMetrics
+ val (internalAccums, externalAccums) =
+ accums.partition(a => a.name.isDefined && tm.nameToAccums.contains(a.name.get))
+
+ internalAccums.foreach { acc =>
+ val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]]
+ tmAcc.metadata = acc.metadata
+ tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]])
+ }
- def createIntAccum(name: String): Accumulator[Int] = {
- newMetric(0, name, IntAccumulatorParam)
+ tm.externalAccums ++= externalAccums
+ tm
}
+}
+
+
+private[spark] class BlockStatusesAccumulator
+ extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
+ private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
- def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = {
- newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam)
+ override def isZero(): Boolean = _seq.isEmpty
+
+ override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
+
+ override def add(v: (BlockId, BlockStatus)): Unit = _seq += v
+
+ override def merge(other: NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]])
+ : Unit = other match {
+ case o: BlockStatusesAccumulator => _seq ++= o.localValue
+ case _ => throw new UnsupportedOperationException(
+ s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- /**
- * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.
- *
- * Executors only send accumulator updates back to the driver, not [[TaskMetrics]]. However, we
- * need the latter to post task end events to listeners, so we need to reconstruct the metrics
- * on the driver.
- *
- * This assumes the provided updates contain the initial set of accumulators representing
- * internal task level metrics.
- */
- def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
- val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
- val metrics = new ListenerTaskMetrics(definedAccumUpdates)
- // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is only used to post
- // event, we should un-register all accumulators immediately.
- metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
- definedAccumUpdates.filter(_.internal).foreach { accum =>
- metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get))
- }
- metrics
+ override def localValue: Seq[(BlockId, BlockStatus)] = _seq
+
+ def setValue(newValue: Seq[(BlockId, BlockStatus)]): Unit = {
+ _seq.clear()
+ _seq ++= newValue
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b7fb608ea5..a96d5f6fbf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -209,7 +209,7 @@ class DAGScheduler(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[AccumulableInfo],
+ accumUpdates: Seq[NewAccumulator[_, _]],
taskInfo: TaskInfo): Unit = {
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo))
@@ -1088,21 +1088,19 @@ class DAGScheduler(
val task = event.task
val stage = stageIdToStage(task.stageId)
try {
- event.accumUpdates.foreach { ainfo =>
- assert(ainfo.update.isDefined, "accumulator from task should have a partial value")
- val id = ainfo.id
- val partialValue = ainfo.update.get
+ event.accumUpdates.foreach { updates =>
+ val id = updates.id
// Find the corresponding accumulator on the driver and update it
- val acc: Accumulable[Any, Any] = Accumulators.get(id) match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
+ val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match {
+ case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]]
case None =>
throw new SparkException(s"attempted to access non-existent accumulator $id")
}
- acc ++= partialValue
+ acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]])
// To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && partialValue != acc.zero) {
+ if (acc.name.isDefined && !updates.isZero()) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
- event.taskInfo.accumulables += acc.toInfo(Some(partialValue), Some(acc.value))
+ event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
}
}
} catch {
@@ -1131,7 +1129,7 @@ class DAGScheduler(
val taskMetrics: TaskMetrics =
if (event.accumUpdates.nonEmpty) {
try {
- TaskMetrics.fromAccumulatorUpdates(event.accumUpdates)
+ TaskMetrics.fromAccumulators(event.accumUpdates)
} catch {
case NonFatal(e) =>
logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index a3845c6acd..e57a2246d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[AccumulableInfo],
+ accumUpdates: Seq[NewAccumulator[_, _]],
taskInfo: TaskInfo)
extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 080ea6c33a..7618dfeeed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -21,18 +21,15 @@ import java.util.Properties
import javax.annotation.Nullable
import scala.collection.Map
-import scala.collection.mutable
import com.fasterxml.jackson.annotation.JsonTypeInfo
import org.apache.spark.{SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{Distribution, Utils}
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 02185bf631..2f972b064b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -112,7 +112,7 @@ private[scheduler] abstract class Stage(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
- metrics.registerForCleanup(rdd.sparkContext)
+ metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index eb10f3e69b..e7ca6efd84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -23,7 +23,7 @@ import java.util.Properties
import scala.collection.mutable.HashMap
-import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl}
+import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
@@ -52,7 +52,7 @@ private[spark] abstract class Task[T](
val stageAttemptId: Int,
val partitionId: Int,
// The default value is only used in tests.
- val metrics: TaskMetrics = TaskMetrics.empty,
+ val metrics: TaskMetrics = TaskMetrics.registered,
@transient var localProperties: Properties = new Properties) extends Serializable {
/**
@@ -153,11 +153,11 @@ private[spark] abstract class Task[T](
* Collect the latest values of accumulators used in this task. If the task failed,
* filter out the accumulators whose values should not be included on failures.
*/
- def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo] = {
+ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[NewAccumulator[_, _]] = {
if (context != null) {
- context.taskMetrics.accumulatorUpdates().filter { a => !taskFailed || a.countFailedValues }
+ context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues }
} else {
- Seq.empty[AccumulableInfo]
+ Seq.empty
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 03135e63d7..b472c5511b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.SparkEnv
+import org.apache.spark.{NewAccumulator, SparkEnv}
import org.apache.spark.storage.BlockId
import org.apache.spark.util.Utils
@@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int)
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark] class DirectTaskResult[T](
var valueBytes: ByteBuffer,
- var accumUpdates: Seq[AccumulableInfo])
+ var accumUpdates: Seq[NewAccumulator[_, _]])
extends TaskResult[T] with Externalizable {
private var valueObjectDeserialized = false
@@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T](
if (numUpdates == 0) {
accumUpdates = null
} else {
- val _accumUpdates = new ArrayBuffer[AccumulableInfo]
+ val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]]
for (i <- 0 until numUpdates) {
- _accumUpdates += in.readObject.asInstanceOf[AccumulableInfo]
+ _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]]
}
accumUpdates = _accumUpdates
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index ae7ef46abb..b438c285fd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -93,9 +93,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
// we would have to serialize the result again after updating the size.
result.accumUpdates = result.accumUpdates.map { a =>
if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
- assert(a.update == Some(0L),
- "task result size should not have been set on the executors")
- a.copy(update = Some(size.toLong))
+ val acc = a.asInstanceOf[LongAccumulator]
+ assert(acc.sum == 0L, "task result size should not have been set on the executors")
+ acc.setValue(size.toLong)
+ acc
} else {
a
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 647d44a0f0..75a0c56311 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler
+import org.apache.spark.NewAccumulator
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
@@ -66,7 +67,7 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f31ec2af4e..776a3226cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -384,13 +384,14 @@ private[spark] class TaskSchedulerImpl(
*/
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = {
// (taskId, stageId, stageAttemptId, accumUpdates)
val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
accumUpdates.flatMap { case (id, updates) =>
taskIdToTaskSetManager.get(id).map { taskSetMgr =>
- (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates)
+ (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId,
+ updates.map(acc => acc.toInfo(Some(acc.value), None)))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6e08cdd87a..b79f643a74 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -647,7 +647,7 @@ private[spark] class TaskSetManager(
info.markFailed()
val index = info.index
copiesRunning(index) -= 1
- var accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]
+ var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
reason.asInstanceOf[TaskFailedReason].toErrorString
val failureException: Option[Throwable] = reason match {
@@ -663,7 +663,7 @@ private[spark] class TaskSetManager(
case ef: ExceptionFailure =>
// ExceptionFailure's might have accumulator updates
- accumUpdates = ef.accumUpdates
+ accumUpdates = ef.accums
if (ef.className == classOf[NotSerializableException].getName) {
// If the task result wasn't serializable, there's no point in trying to re-execute it.
logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
@@ -788,7 +788,7 @@ private[spark] class TaskSetManager(
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
sched.dagScheduler.taskEnded(
- tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
+ tasks(index), Resubmitted, null, Seq.empty, info)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
index 8daca6c390..c04b483831 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
@@ -266,7 +266,13 @@ private[spark] object SerializationDebugger extends Logging {
(o, desc)
} else {
// write place
- findObjectAndDescriptor(desc.invokeWriteReplace(o))
+ val replaced = desc.invokeWriteReplace(o)
+ // `writeReplace` may return the same object.
+ if (replaced eq o) {
+ (o, desc)
+ } else {
+ findObjectAndDescriptor(replaced)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index ff28796a60..32e332a9ad 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -186,8 +186,8 @@ class OutputMetrics private[spark](
val recordsWritten: Long)
class ShuffleReadMetrics private[spark](
- val remoteBlocksFetched: Int,
- val localBlocksFetched: Int,
+ val remoteBlocksFetched: Long,
+ val localBlocksFetched: Long,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
val localBytesRead: Long,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1c4921666f..f2d06c7ea8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -801,7 +801,7 @@ private[spark] class BlockManager(
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
@@ -958,7 +958,7 @@ private[spark] class BlockManager(
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
@@ -1257,7 +1257,7 @@ private[spark] class BlockManager(
}
if (blockIsUpdated) {
Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}
status.storageLevel
@@ -1311,7 +1311,7 @@ private[spark] class BlockManager(
reportBlockStatus(blockId, info, removeBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, removeBlockStatus)))
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 9ab7d96e29..945830c8bf 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -375,26 +375,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.taskTime += info.duration
stageData.numActiveTasks -= 1
- val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) =
+ val errorMessage: Option[String] =
taskEnd.reason match {
case org.apache.spark.Success =>
stageData.completedIndices.add(info.index)
stageData.numCompleteTasks += 1
- (None, taskEnd.taskMetrics.accumulatorUpdates())
+ None
case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
stageData.numFailedTasks += 1
- (Some(e.toErrorString), e.accumUpdates)
+ Some(e.toErrorString)
case e: TaskFailedReason => // All other failure cases
stageData.numFailedTasks += 1
- (Some(e.toErrorString), Seq.empty[AccumulableInfo])
+ Some(e.toErrorString)
}
- val taskMetrics =
- if (accums.nonEmpty) {
- Some(TaskMetrics.fromAccumulatorUpdates(accums))
- } else {
- None
- }
+ val taskMetrics = Option(taskEnd.taskMetrics)
taskMetrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
@@ -503,7 +498,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
val taskData = stageData.taskData.get(taskId)
- val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
+ val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
taskData.foreach { t =>
if (!t.taskInfo.finished) {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a613fbc5cc..aeab71d9df 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -840,7 +840,9 @@ private[spark] object JsonProtocol {
// Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
.map(_.extract[List[JValue]].map(accumulableInfoFromJson))
- .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates())
+ .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
+ acc.toInfo(Some(acc.localValue), None)
+ }))
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled