diff options
Diffstat (limited to 'core/src/main/scala')
30 files changed, 748 insertions, 367 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index e8f053c150..c76720c4bb 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -63,7 +63,7 @@ class Accumulable[R, T] private ( param: AccumulableParam[R, T], name: Option[String], countFailedValues: Boolean) = { - this(Accumulators.newId(), initialValue, param, name, countFailedValues) + this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues) } private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = { @@ -72,34 +72,23 @@ class Accumulable[R, T] private ( def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) - @volatile @transient private var value_ : R = initialValue // Current value on driver - val zero = param.zero(initialValue) // Zero value to be passed to executors - private var deserialized = false - - Accumulators.register(this) - - /** - * Return a copy of this [[Accumulable]]. - * - * The copy will have the same ID as the original and will not be registered with - * [[Accumulators]] again. This method exists so that the caller can avoid passing the - * same mutable instance around. - */ - private[spark] def copy(): Accumulable[R, T] = { - new Accumulable[R, T](id, initialValue, param, name, countFailedValues) - } + val zero = param.zero(initialValue) + private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param) + newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues) + // Register the new accumulator in ctor, to follow the previous behaviour. + AccumulatorContext.register(newAcc) /** * Add more data to this accumulator / accumulable * @param term the data to add */ - def += (term: T) { value_ = param.addAccumulator(value_, term) } + def += (term: T) { newAcc.add(term) } /** * Add more data to this accumulator / accumulable * @param term the data to add */ - def add(term: T) { value_ = param.addAccumulator(value_, term) } + def add(term: T) { newAcc.add(term) } /** * Merge two accumulable objects together @@ -107,7 +96,7 @@ class Accumulable[R, T] private ( * Normally, a user will not want to use this version, but will instead call `+=`. * @param term the other `R` that will get merged with this */ - def ++= (term: R) { value_ = param.addInPlace(value_, term)} + def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } /** * Merge two accumulable objects together @@ -115,18 +104,12 @@ class Accumulable[R, T] private ( * Normally, a user will not want to use this version, but will instead call `add`. * @param term the other `R` that will get merged with this */ - def merge(term: R) { value_ = param.addInPlace(value_, term)} + def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } /** * Access the accumulator's current value; only allowed on driver. */ - def value: R = { - if (!deserialized) { - value_ - } else { - throw new UnsupportedOperationException("Can't read accumulator value in task") - } - } + def value: R = newAcc.value /** * Get the current value of this accumulator from within a task. @@ -137,14 +120,14 @@ class Accumulable[R, T] private ( * The typical use of this method is to directly mutate the local value, eg., to add * an element to a Set. */ - def localValue: R = value_ + def localValue: R = newAcc.localValue /** * Set the accumulator's value; only allowed on driver. */ def value_= (newValue: R) { - if (!deserialized) { - value_ = newValue + if (newAcc.isAtDriverSide) { + newAcc._value = newValue } else { throw new UnsupportedOperationException("Can't assign accumulator value in task") } @@ -153,7 +136,7 @@ class Accumulable[R, T] private ( /** * Set the accumulator's value. For internal use only. */ - def setValue(newValue: R): Unit = { value_ = newValue } + def setValue(newValue: R): Unit = { newAcc._value = newValue } /** * Set the accumulator's value. For internal use only. @@ -168,22 +151,7 @@ class Accumulable[R, T] private ( new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) } - // Called by Java when deserializing an object - private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { - in.defaultReadObject() - value_ = zero - deserialized = true - - // Automatically register the accumulator when it is deserialized with the task closure. - // This is for external accumulators and internal ones that do not represent task level - // metrics, e.g. internal SQL metrics, which are per-operator. - val taskContext = TaskContext.get() - if (taskContext != null) { - taskContext.registerAccumulator(this) - } - } - - override def toString: String = if (value_ == null) "null" else value_.toString + override def toString: String = if (newAcc._value == null) "null" else newAcc._value.toString } diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index 0c17f014c9..9b007b9776 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -68,73 +68,6 @@ class Accumulator[T] private[spark] ( extends Accumulable[T, T](initialValue, param, name, countFailedValues) -// TODO: The multi-thread support in accumulators is kind of lame; check -// if there's a more intuitive way of doing it right -private[spark] object Accumulators extends Logging { - /** - * This global map holds the original accumulator objects that are created on the driver. - * It keeps weak references to these objects so that accumulators can be garbage-collected - * once the RDDs and user-code that reference them are cleaned up. - * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). - */ - @GuardedBy("Accumulators") - val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() - - private val nextId = new AtomicLong(0L) - - /** - * Return a globally unique ID for a new [[Accumulable]]. - * Note: Once you copy the [[Accumulable]] the ID is no longer unique. - */ - def newId(): Long = nextId.getAndIncrement - - /** - * Register an [[Accumulable]] created on the driver such that it can be used on the executors. - * - * All accumulators registered here can later be used as a container for accumulating partial - * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. - * Note: if an accumulator is registered here, it should also be registered with the active - * context cleaner for cleanup so as to avoid memory leaks. - * - * If an [[Accumulable]] with the same ID was already registered, this does nothing instead - * of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct - * [[org.apache.spark.executor.TaskMetrics]] from accumulator updates. - */ - def register(a: Accumulable[_, _]): Unit = synchronized { - if (!originals.contains(a.id)) { - originals(a.id) = new WeakReference[Accumulable[_, _]](a) - } - } - - /** - * Unregister the [[Accumulable]] with the given ID, if any. - */ - def remove(accId: Long): Unit = synchronized { - originals.remove(accId) - } - - /** - * Return the [[Accumulable]] registered with the given ID, if any. - */ - def get(id: Long): Option[Accumulable[_, _]] = synchronized { - originals.get(id).map { weakRef => - // Since we are storing weak references, we must check whether the underlying data is valid. - weakRef.get.getOrElse { - throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") - } - } - } - - /** - * Clear all registered [[Accumulable]]s. For testing only. - */ - def clear(): Unit = synchronized { - originals.clear() - } - -} - - /** * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 76692ccec8..63a00a84af 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { registerForCleanup(rdd, CleanRDD(rdd.id)) } - def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = { + def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { registerForCleanup(a, CleanAccum(a.id)) } @@ -241,7 +241,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { def doCleanupAccum(accId: Long, blocking: Boolean): Unit = { try { logDebug("Cleaning accumulator " + accId) - Accumulators.remove(accId) + AccumulatorContext.remove(accId) listeners.asScala.foreach(_.accumCleaned(accId)) logInfo("Cleaned accumulator " + accId) } catch { diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 2bdbd3fae9..9eac05fdf9 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} */ private[spark] case class Heartbeat( executorId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> accumulator updates blockManagerId: BlockManagerId) /** diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala new file mode 100644 index 0000000000..edb9b741a8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.{lang => jl} +import java.io.ObjectInputStream +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import org.apache.spark.scheduler.AccumulableInfo +import org.apache.spark.util.Utils + + +private[spark] case class AccumulatorMetadata( + id: Long, + name: Option[String], + countFailedValues: Boolean) extends Serializable + + +/** + * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of + * type `OUT`. + */ +abstract class NewAccumulator[IN, OUT] extends Serializable { + private[spark] var metadata: AccumulatorMetadata = _ + private[this] var atDriverSide = true + + private[spark] def register( + sc: SparkContext, + name: Option[String] = None, + countFailedValues: Boolean = false): Unit = { + if (this.metadata != null) { + throw new IllegalStateException("Cannot register an Accumulator twice.") + } + this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues) + AccumulatorContext.register(this) + sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) + } + + /** + * Returns true if this accumulator has been registered. Note that all accumulators must be + * registered before ues, or it will throw exception. + */ + final def isRegistered: Boolean = + metadata != null && AccumulatorContext.originals.containsKey(metadata.id) + + private def assertMetadataNotNull(): Unit = { + if (metadata == null) { + throw new IllegalAccessError("The metadata of this accumulator has not been assigned yet.") + } + } + + /** + * Returns the id of this accumulator, can only be called after registration. + */ + final def id: Long = { + assertMetadataNotNull() + metadata.id + } + + /** + * Returns the name of this accumulator, can only be called after registration. + */ + final def name: Option[String] = { + assertMetadataNotNull() + metadata.name + } + + /** + * Whether to accumulate values from failed tasks. This is set to true for system and time + * metrics like serialization time or bytes spilled, and false for things with absolute values + * like number of input rows. This should be used for internal metrics only. + */ + private[spark] final def countFailedValues: Boolean = { + assertMetadataNotNull() + metadata.countFailedValues + } + + /** + * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] with the provided + * values. + */ + private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) + } + + final private[spark] def isAtDriverSide: Boolean = atDriverSide + + /** + * Tells if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero + * value; for a list accumulator, Nil is zero value. + */ + def isZero(): Boolean + + /** + * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy + * must return true. + */ + def copyAndReset(): NewAccumulator[IN, OUT] + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + */ + def add(v: IN): Unit + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. + */ + def merge(other: NewAccumulator[IN, OUT]): Unit + + /** + * Access this accumulator's current value; only allowed on driver. + */ + final def value: OUT = { + if (atDriverSide) { + localValue + } else { + throw new UnsupportedOperationException("Can't read accumulator value in task") + } + } + + /** + * Defines the current value of this accumulator. + * + * This is NOT the global value of the accumulator. To get the global value after a + * completed operation on the dataset, call `value`. + */ + def localValue: OUT + + // Called by Java when serializing an object + final protected def writeReplace(): Any = { + if (atDriverSide) { + if (!isRegistered) { + throw new UnsupportedOperationException( + "Accumulator must be registered before send to executor") + } + val copy = copyAndReset() + assert(copy.isZero(), "copyAndReset must return a zero value copy") + copy.metadata = metadata + copy + } else { + this + } + } + + // Called by Java when deserializing an object + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + if (atDriverSide) { + atDriverSide = false + + // Automatically register the accumulator when it is deserialized with the task closure. + // This is for external accumulators and internal ones that do not represent task level + // metrics, e.g. internal SQL metrics, which are per-operator. + val taskContext = TaskContext.get() + if (taskContext != null) { + taskContext.registerAccumulator(this) + } + } else { + atDriverSide = true + } + } + + override def toString: String = { + if (metadata == null) { + "Un-registered Accumulator: " + getClass.getSimpleName + } else { + getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)" + } + } +} + + +private[spark] object AccumulatorContext { + + /** + * This global map holds the original accumulator objects that are created on the driver. + * It keeps weak references to these objects so that accumulators can be garbage-collected + * once the RDDs and user-code that reference them are cleaned up. + * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). + */ + @GuardedBy("AccumulatorContext") + val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] + + private[this] val nextId = new AtomicLong(0L) + + /** + * Return a globally unique ID for a new [[Accumulator]]. + * Note: Once you copy the [[Accumulator]] the ID is no longer unique. + */ + def newId(): Long = nextId.getAndIncrement + + /** + * Register an [[Accumulator]] created on the driver such that it can be used on the executors. + * + * All accumulators registered here can later be used as a container for accumulating partial + * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. + * Note: if an accumulator is registered here, it should also be registered with the active + * context cleaner for cleanup so as to avoid memory leaks. + * + * If an [[Accumulator]] with the same ID was already registered, this does nothing instead + * of overwriting it. We will never register same accumulator twice, this is just a sanity check. + */ + def register(a: NewAccumulator[_, _]): Unit = synchronized { + if (!originals.containsKey(a.id)) { + originals.put(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a)) + } + } + + /** + * Unregister the [[Accumulator]] with the given ID, if any. + */ + def remove(id: Long): Unit = synchronized { + originals.remove(id) + } + + /** + * Return the [[Accumulator]] registered with the given ID, if any. + */ + def get(id: Long): Option[NewAccumulator[_, _]] = synchronized { + Option(originals.get(id)).map { ref => + // Since we are storing weak references, we must check whether the underlying data is valid. + val acc = ref.get + if (acc eq null) { + throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") + } + acc + } + } + + /** + * Clear all registered [[Accumulator]]s. For testing only. + */ + def clear(): Unit = synchronized { + originals.clear() + } +} + + +class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] { + private[this] var _sum = 0L + + override def isZero(): Boolean = _sum == 0 + + override def copyAndReset(): LongAccumulator = new LongAccumulator + + override def add(v: jl.Long): Unit = _sum += v + + def add(v: Long): Unit = _sum += v + + def sum: Long = _sum + + override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other match { + case o: LongAccumulator => _sum += o.sum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + private[spark] def setValue(newValue: Long): Unit = _sum = newValue + + override def localValue: jl.Long = _sum +} + + +class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] { + private[this] var _sum = 0.0 + + override def isZero(): Boolean = _sum == 0.0 + + override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator + + override def add(v: jl.Double): Unit = _sum += v + + def add(v: Double): Unit = _sum += v + + def sum: Double = _sum + + override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match { + case o: DoubleAccumulator => _sum += o.sum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + private[spark] def setValue(newValue: Double): Unit = _sum = newValue + + override def localValue: jl.Double = _sum +} + + +class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] { + private[this] var _sum = 0.0 + private[this] var _count = 0L + + override def isZero(): Boolean = _sum == 0.0 && _count == 0 + + override def copyAndReset(): AverageAccumulator = new AverageAccumulator + + override def add(v: jl.Double): Unit = { + _sum += v + _count += 1 + } + + def add(d: Double): Unit = { + _sum += d + _count += 1 + } + + override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match { + case o: AverageAccumulator => + _sum += o.sum + _count += o.count + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: jl.Double = if (_count == 0) { + Double.NaN + } else { + _sum / _count + } + + def sum: Double = _sum + + def count: Long = _count +} + + +class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] { + private[this] val _list: java.util.List[T] = new java.util.ArrayList[T] + + override def isZero(): Boolean = _list.isEmpty + + override def copyAndReset(): ListAccumulator[T] = new ListAccumulator + + override def add(v: T): Unit = _list.add(v) + + override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = other match { + case o: ListAccumulator[T] => _list.addAll(o.localValue) + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + + private[spark] def setValue(newValue: java.util.List[T]): Unit = { + _list.clear() + _list.addAll(newValue) + } +} + + +class LegacyAccumulatorWrapper[R, T]( + initialValue: R, + param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, R] { + private[spark] var _value = initialValue // Current value on driver + + override def isZero(): Boolean = _value == param.zero(initialValue) + + override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = { + val acc = new LegacyAccumulatorWrapper(initialValue, param) + acc._value = param.zero(initialValue) + acc + } + + override def add(v: T): Unit = _value = param.addAccumulator(_value, v) + + override def merge(other: NewAccumulator[T, R]): Unit = other match { + case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: R = _value +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f322a770bf..865989aee0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1217,10 +1217,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" * values to using the `+=` method. Only the driver can access the accumulator's `value`. */ - def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = - { + def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = { val acc = new Accumulator(initialValue, param) - cleaner.foreach(_.registerAccumulatorForCleanup(acc)) + cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) acc } @@ -1232,7 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) : Accumulator[T] = { val acc = new Accumulator(initialValue, param, Some(name)) - cleaner.foreach(_.registerAccumulatorForCleanup(acc)) + cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) acc } @@ -1245,7 +1244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) : Accumulable[R, T] = { val acc = new Accumulable(initialValue, param) - cleaner.foreach(_.registerAccumulatorForCleanup(acc)) + cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) acc } @@ -1259,7 +1258,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) : Accumulable[R, T] = { val acc = new Accumulable(initialValue, param, Some(name)) - cleaner.foreach(_.registerAccumulatorForCleanup(acc)) + cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) acc } @@ -1273,7 +1272,101 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli (initialValue: R): Accumulable[R, T] = { val param = new GrowableAccumulableParam[R, T] val acc = new Accumulable(initialValue, param) - cleaner.foreach(_.registerAccumulatorForCleanup(acc)) + cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) + acc + } + + /** + * Register the given accumulator. Note that accumulators must be registered before use, or it + * will throw exception. + */ + def register(acc: NewAccumulator[_, _]): Unit = { + acc.register(this) + } + + /** + * Register the given accumulator with given name. Note that accumulators must be registered + * before use, or it will throw exception. + */ + def register(acc: NewAccumulator[_, _], name: String): Unit = { + acc.register(this, name = Some(name)) + } + + /** + * Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`. + */ + def longAccumulator: LongAccumulator = { + val acc = new LongAccumulator + register(acc) + acc + } + + /** + * Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`. + */ + def longAccumulator(name: String): LongAccumulator = { + val acc = new LongAccumulator + register(acc, name) + acc + } + + /** + * Create and register a double accumulator, which starts with 0 and accumulates inputs by `+=`. + */ + def doubleAccumulator: DoubleAccumulator = { + val acc = new DoubleAccumulator + register(acc) + acc + } + + /** + * Create and register a double accumulator, which starts with 0 and accumulates inputs by `+=`. + */ + def doubleAccumulator(name: String): DoubleAccumulator = { + val acc = new DoubleAccumulator + register(acc, name) + acc + } + + /** + * Create and register an average accumulator, which accumulates double inputs by recording the + * total sum and total count, and produce the output by sum / total. Note that Double.NaN will be + * returned if no input is added. + */ + def averageAccumulator: AverageAccumulator = { + val acc = new AverageAccumulator + register(acc) + acc + } + + /** + * Create and register an average accumulator, which accumulates double inputs by recording the + * total sum and total count, and produce the output by sum / total. Note that Double.NaN will be + * returned if no input is added. + */ + def averageAccumulator(name: String): AverageAccumulator = { + val acc = new AverageAccumulator + register(acc, name) + acc + } + + /** + * Create and register a list accumulator, which starts with empty list and accumulates inputs + * by adding them into the inner list. + */ + def listAccumulator[T]: ListAccumulator[T] = { + val acc = new ListAccumulator[T] + register(acc) + acc + } + + /** + * Create and register a list accumulator, which starts with empty list and accumulates inputs + * by adding them into the inner list. + */ + def listAccumulator[T](name: String): ListAccumulator[T] = { + val acc = new ListAccumulator[T] + register(acc, name) acc } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index e7940bd9ed..9e53257462 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable { * Register an accumulator that belongs to this task. Accumulators must call this method when * deserializing in executors. */ - private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit + private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 43e555670d..bc3807f5db 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -122,7 +122,7 @@ private[spark] class TaskContextImpl( override def getMetricsSources(sourceName: String): Seq[Source] = metricsSystem.getSourcesByName(sourceName) - private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = { + private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): Unit = { taskMetrics.registerAccumulator(a) } diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 7487cfe9c5..82ba2d0c27 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -19,10 +19,7 @@ package org.apache.spark import java.io.{ObjectInputStream, ObjectOutputStream} -import scala.util.Try - import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.BlockManagerId @@ -120,18 +117,10 @@ case class ExceptionFailure( stackTrace: Array[StackTraceElement], fullStackTrace: String, private val exceptionWrapper: Option[ThrowableSerializationWrapper], - accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]) + accumUpdates: Seq[AccumulableInfo] = Seq.empty, + private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil) extends TaskFailedReason { - @deprecated("use accumUpdates instead", "2.0.0") - val metrics: Option[TaskMetrics] = { - if (accumUpdates.nonEmpty) { - Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption - } else { - None - } - } - /** * `preserveCause` is used to keep the exception itself so it is available to the * driver. This may be set to `false` in the event that the exception is not in fact @@ -149,6 +138,11 @@ case class ExceptionFailure( this(e, accumUpdates, preserveCause = true) } + private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): ExceptionFailure = { + this.accums = accums + this + } + def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception)) override def toErrorString: String = diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 650f05c309..4d61f7e232 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -353,22 +353,24 @@ private[spark] class Executor( logError(s"Exception in $taskName (TID $taskId)", t) // Collect latest accumulator values to report back to the driver - val accumulatorUpdates: Seq[AccumulableInfo] = + val accums: Seq[NewAccumulator[_, _]] = if (task != null) { task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.collectAccumulatorUpdates(taskFailed = true) } else { - Seq.empty[AccumulableInfo] + Seq.empty } + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.localValue), None)) + val serializedTaskEndReason = { try { - ser.serialize(new ExceptionFailure(t, accumulatorUpdates)) + ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, accumulatorUpdates, preserveCause = false)) + ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) } } execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) @@ -476,14 +478,14 @@ private[spark] class Executor( /** Reports heartbeat and metrics for active tasks to the driver. */ private def reportHeartBeat(): Unit = { // list of (task id, accumUpdates) to send back to the driver - val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulableInfo])]() + val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]() val curGCTime = computeTotalGcTime() for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates())) + accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) } } diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 535352e7dd..6f7160ac0d 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.InternalAccumulator +import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi @@ -40,20 +40,18 @@ object DataReadMethod extends Enumeration with Serializable { */ @DeveloperApi class InputMetrics private[spark] () extends Serializable { - import InternalAccumulator._ - - private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ) - private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ) + private[executor] val _bytesRead = new LongAccumulator + private[executor] val _recordsRead = new LongAccumulator /** * Total number of bytes read. */ - def bytesRead: Long = _bytesRead.localValue + def bytesRead: Long = _bytesRead.sum /** * Total number of records read. */ - def recordsRead: Long = _recordsRead.localValue + def recordsRead: Long = _recordsRead.sum private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index 586c98b156..db3924cb69 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.InternalAccumulator +import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi @@ -39,20 +39,18 @@ object DataWriteMethod extends Enumeration with Serializable { */ @DeveloperApi class OutputMetrics private[spark] () extends Serializable { - import InternalAccumulator._ - - private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN) - private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN) + private[executor] val _bytesWritten = new LongAccumulator + private[executor] val _recordsWritten = new LongAccumulator /** * Total number of bytes written. */ - def bytesWritten: Long = _bytesWritten.localValue + def bytesWritten: Long = _bytesWritten.sum /** * Total number of records written. */ - def recordsWritten: Long = _recordsWritten.localValue + def recordsWritten: Long = _recordsWritten.sum private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v) private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v) diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index f012a74db6..fa962108c3 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.InternalAccumulator +import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi @@ -28,52 +28,44 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi class ShuffleReadMetrics private[spark] () extends Serializable { - import InternalAccumulator._ - - private[executor] val _remoteBlocksFetched = - TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED) - private[executor] val _localBlocksFetched = - TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED) - private[executor] val _remoteBytesRead = - TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ) - private[executor] val _localBytesRead = - TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ) - private[executor] val _fetchWaitTime = - TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME) - private[executor] val _recordsRead = - TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ) + private[executor] val _remoteBlocksFetched = new LongAccumulator + private[executor] val _localBlocksFetched = new LongAccumulator + private[executor] val _remoteBytesRead = new LongAccumulator + private[executor] val _localBytesRead = new LongAccumulator + private[executor] val _fetchWaitTime = new LongAccumulator + private[executor] val _recordsRead = new LongAccumulator /** * Number of remote blocks fetched in this shuffle by this task. */ - def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue + def remoteBlocksFetched: Long = _remoteBlocksFetched.sum /** * Number of local blocks fetched in this shuffle by this task. */ - def localBlocksFetched: Int = _localBlocksFetched.localValue + def localBlocksFetched: Long = _localBlocksFetched.sum /** * Total number of remote bytes read from the shuffle by this task. */ - def remoteBytesRead: Long = _remoteBytesRead.localValue + def remoteBytesRead: Long = _remoteBytesRead.sum /** * Shuffle data that was read from the local disk (as opposed to from a remote executor). */ - def localBytesRead: Long = _localBytesRead.localValue + def localBytesRead: Long = _localBytesRead.sum /** * Time the task spent waiting for remote shuffle blocks. This only includes the time * blocking on shuffle input data. For instance if block B is being fetched while the task is * still not finished processing block A, it is not considered to be blocking on block B. */ - def fetchWaitTime: Long = _fetchWaitTime.localValue + def fetchWaitTime: Long = _fetchWaitTime.sum /** * Total number of records read from the shuffle by this task. */ - def recordsRead: Long = _recordsRead.localValue + def recordsRead: Long = _recordsRead.sum /** * Total bytes fetched in the shuffle by this task (both remote and local). @@ -83,10 +75,10 @@ class ShuffleReadMetrics private[spark] () extends Serializable { /** * Number of blocks fetched in this shuffle by this task (remote or local). */ - def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched + def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched - private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v) - private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v) + private[spark] def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched.add(v) + private[spark] def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched.add(v) private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v) private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v) private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v) @@ -104,12 +96,12 @@ class ShuffleReadMetrics private[spark] () extends Serializable { * [[TempShuffleReadMetrics]] into `this`. */ private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = { - _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero) - _localBlocksFetched.setValue(_localBlocksFetched.zero) - _remoteBytesRead.setValue(_remoteBytesRead.zero) - _localBytesRead.setValue(_localBytesRead.zero) - _fetchWaitTime.setValue(_fetchWaitTime.zero) - _recordsRead.setValue(_recordsRead.zero) + _remoteBlocksFetched.setValue(0) + _localBlocksFetched.setValue(0) + _remoteBytesRead.setValue(0) + _localBytesRead.setValue(0) + _fetchWaitTime.setValue(0) + _recordsRead.setValue(0) metrics.foreach { metric => _remoteBlocksFetched.add(metric.remoteBlocksFetched) _localBlocksFetched.add(metric.localBlocksFetched) @@ -127,22 +119,22 @@ class ShuffleReadMetrics private[spark] () extends Serializable { * last. */ private[spark] class TempShuffleReadMetrics { - private[this] var _remoteBlocksFetched = 0 - private[this] var _localBlocksFetched = 0 + private[this] var _remoteBlocksFetched = 0L + private[this] var _localBlocksFetched = 0L private[this] var _remoteBytesRead = 0L private[this] var _localBytesRead = 0L private[this] var _fetchWaitTime = 0L private[this] var _recordsRead = 0L - def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v - def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v + def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v + def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v def incLocalBytesRead(v: Long): Unit = _localBytesRead += v def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v def incRecordsRead(v: Long): Unit = _recordsRead += v - def remoteBlocksFetched: Int = _remoteBlocksFetched - def localBlocksFetched: Int = _localBlocksFetched + def remoteBlocksFetched: Long = _remoteBlocksFetched + def localBlocksFetched: Long = _localBlocksFetched def remoteBytesRead: Long = _remoteBytesRead def localBytesRead: Long = _localBytesRead def fetchWaitTime: Long = _fetchWaitTime diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 7326fba841..0e70a4f522 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.InternalAccumulator +import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi @@ -28,29 +28,24 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi class ShuffleWriteMetrics private[spark] () extends Serializable { - import InternalAccumulator._ - - private[executor] val _bytesWritten = - TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN) - private[executor] val _recordsWritten = - TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN) - private[executor] val _writeTime = - TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME) + private[executor] val _bytesWritten = new LongAccumulator + private[executor] val _recordsWritten = new LongAccumulator + private[executor] val _writeTime = new LongAccumulator /** * Number of bytes written for the shuffle by this task. */ - def bytesWritten: Long = _bytesWritten.localValue + def bytesWritten: Long = _bytesWritten.sum /** * Total number of records written to the shuffle by this task. */ - def recordsWritten: Long = _recordsWritten.localValue + def recordsWritten: Long = _recordsWritten.sum /** * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds. */ - def writeTime: Long = _writeTime.localValue + def writeTime: Long = _writeTime.sum private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 8513d053f2..0b64917219 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,10 +17,9 @@ package org.apache.spark.executor -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} import org.apache.spark._ -import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo @@ -42,53 +41,51 @@ import org.apache.spark.storage.{BlockId, BlockStatus} */ @DeveloperApi class TaskMetrics private[spark] () extends Serializable { - import InternalAccumulator._ - // Each metric is internally represented as an accumulator - private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME) - private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME) - private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE) - private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME) - private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME) - private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED) - private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED) - private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY) - private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES) + private val _executorDeserializeTime = new LongAccumulator + private val _executorRunTime = new LongAccumulator + private val _resultSize = new LongAccumulator + private val _jvmGCTime = new LongAccumulator + private val _resultSerializationTime = new LongAccumulator + private val _memoryBytesSpilled = new LongAccumulator + private val _diskBytesSpilled = new LongAccumulator + private val _peakExecutionMemory = new LongAccumulator + private val _updatedBlockStatuses = new BlockStatusesAccumulator /** * Time taken on the executor to deserialize this task. */ - def executorDeserializeTime: Long = _executorDeserializeTime.localValue + def executorDeserializeTime: Long = _executorDeserializeTime.sum /** * Time the executor spends actually running the task (including fetching shuffle data). */ - def executorRunTime: Long = _executorRunTime.localValue + def executorRunTime: Long = _executorRunTime.sum /** * The number of bytes this task transmitted back to the driver as the TaskResult. */ - def resultSize: Long = _resultSize.localValue + def resultSize: Long = _resultSize.sum /** * Amount of time the JVM spent in garbage collection while executing this task. */ - def jvmGCTime: Long = _jvmGCTime.localValue + def jvmGCTime: Long = _jvmGCTime.sum /** * Amount of time spent serializing the task result. */ - def resultSerializationTime: Long = _resultSerializationTime.localValue + def resultSerializationTime: Long = _resultSerializationTime.sum /** * The number of in-memory bytes spilled by this task. */ - def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue + def memoryBytesSpilled: Long = _memoryBytesSpilled.sum /** * The number of on-disk bytes spilled by this task. */ - def diskBytesSpilled: Long = _diskBytesSpilled.localValue + def diskBytesSpilled: Long = _diskBytesSpilled.sum /** * Peak memory used by internal data structures created during shuffles, aggregations and @@ -96,7 +93,7 @@ class TaskMetrics private[spark] () extends Serializable { * across all such data structures created in this task. For SQL jobs, this only tracks all * unsafe operators and ExternalSort. */ - def peakExecutionMemory: Long = _peakExecutionMemory.localValue + def peakExecutionMemory: Long = _peakExecutionMemory.sum /** * Storage statuses of any blocks that have been updated as a result of this task. @@ -114,7 +111,7 @@ class TaskMetrics private[spark] () extends Serializable { private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v) private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v) private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v) - private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = + private[spark] def incUpdatedBlockStatuses(v: (BlockId, BlockStatus)): Unit = _updatedBlockStatuses.add(v) private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = _updatedBlockStatuses.setValue(v) @@ -175,124 +172,143 @@ class TaskMetrics private[spark] () extends Serializable { } // Only used for test - private[spark] val testAccum = - sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM)) - - @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = { - val in = inputMetrics - val out = outputMetrics - val sr = shuffleReadMetrics - val sw = shuffleWriteMetrics - Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime, - _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, _peakExecutionMemory, - _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, sr._remoteBytesRead, - sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, sw._bytesWritten, sw._recordsWritten, - sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, out._recordsWritten) ++ - testAccum - } + private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator) + + + import InternalAccumulator._ + @transient private[spark] lazy val nameToAccums = LinkedHashMap( + EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime, + EXECUTOR_RUN_TIME -> _executorRunTime, + RESULT_SIZE -> _resultSize, + JVM_GC_TIME -> _jvmGCTime, + RESULT_SERIALIZATION_TIME -> _resultSerializationTime, + MEMORY_BYTES_SPILLED -> _memoryBytesSpilled, + DISK_BYTES_SPILLED -> _diskBytesSpilled, + PEAK_EXECUTION_MEMORY -> _peakExecutionMemory, + UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses, + shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched, + shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched, + shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead, + shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead, + shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime, + shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead, + shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten, + shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten, + shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime, + input.BYTES_READ -> inputMetrics._bytesRead, + input.RECORDS_READ -> inputMetrics._recordsRead, + output.BYTES_WRITTEN -> outputMetrics._bytesWritten, + output.RECORDS_WRITTEN -> outputMetrics._recordsWritten + ) ++ testAccum.map(TEST_ACCUM -> _) + + @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] = + nameToAccums.values.toIndexedSeq /* ========================== * | OTHER THINGS | * ========================== */ - private[spark] def registerForCleanup(sc: SparkContext): Unit = { - internalAccums.foreach { accum => - sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) + private[spark] def register(sc: SparkContext): Unit = { + nameToAccums.foreach { + case (name, acc) => acc.register(sc, name = Some(name), countFailedValues = true) } } /** * External accumulators registered with this task. */ - @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, _]] + @transient private lazy val externalAccums = new ArrayBuffer[NewAccumulator[_, _]] - private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = { + private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = { externalAccums += a } - /** - * Return the latest updates of accumulators in this task. - * - * The [[AccumulableInfo.update]] field is always defined and the [[AccumulableInfo.value]] - * field is always empty, since this represents the partial updates recorded in this task, - * not the aggregated value across multiple tasks. - */ - def accumulatorUpdates(): Seq[AccumulableInfo] = { - (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), None) } - } + private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = internalAccums ++ externalAccums } -/** - * Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners. - * Its purpose is to obviate the need for the driver to reconstruct the original accumulators, - * which might have been garbage-collected. See SPARK-13407 for more details. - * - * Instances of this class should be considered read-only and users should not call `inc*()` or - * `set*()` methods. While we could override the setter methods to throw - * UnsupportedOperationException, we choose not to do so because the overrides would quickly become - * out-of-date when new metrics are added. - */ -private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics { - - override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates - - override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = { - throw new UnsupportedOperationException("This TaskMetrics is read-only") - } -} private[spark] object TaskMetrics extends Logging { + import InternalAccumulator._ /** * Create an empty task metrics that doesn't register its accumulators. */ def empty: TaskMetrics = { - val metrics = new TaskMetrics - metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id)) - metrics + val tm = new TaskMetrics + tm.nameToAccums.foreach { case (name, acc) => + acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), true) + } + tm + } + + def registered: TaskMetrics = { + val tm = empty + tm.internalAccums.foreach(AccumulatorContext.register) + tm } /** - * Create a new accumulator representing an internal task metric. + * Construct a [[TaskMetrics]] object from a list of [[AccumulableInfo]], called on driver only. + * The returned [[TaskMetrics]] is only used to get some internal metrics, we don't need to take + * care of external accumulator info passed in. */ - private def newMetric[T]( - initialValue: T, - name: String, - param: AccumulatorParam[T]): Accumulator[T] = { - new Accumulator[T](initialValue, param, Some(name), countFailedValues = true) + def fromAccumulatorInfos(infos: Seq[AccumulableInfo]): TaskMetrics = { + val tm = new TaskMetrics + infos.filter(info => info.name.isDefined && info.update.isDefined).foreach { info => + val name = info.name.get + val value = info.update.get + if (name == UPDATED_BLOCK_STATUSES) { + tm.setUpdatedBlockStatuses(value.asInstanceOf[Seq[(BlockId, BlockStatus)]]) + } else { + tm.nameToAccums.get(name).foreach( + _.asInstanceOf[LongAccumulator].setValue(value.asInstanceOf[Long]) + ) + } + } + tm } - def createLongAccum(name: String): Accumulator[Long] = { - newMetric(0L, name, LongAccumulatorParam) - } + /** + * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only. + */ + def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = { + val tm = new TaskMetrics + val (internalAccums, externalAccums) = + accums.partition(a => a.name.isDefined && tm.nameToAccums.contains(a.name.get)) + + internalAccums.foreach { acc => + val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]] + tmAcc.metadata = acc.metadata + tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]]) + } - def createIntAccum(name: String): Accumulator[Int] = { - newMetric(0, name, IntAccumulatorParam) + tm.externalAccums ++= externalAccums + tm } +} + + +private[spark] class BlockStatusesAccumulator + extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { + private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] - def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = { - newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam) + override def isZero(): Boolean = _seq.isEmpty + + override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator + + override def add(v: (BlockId, BlockStatus)): Unit = _seq += v + + override def merge(other: NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) + : Unit = other match { + case o: BlockStatusesAccumulator => _seq ++= o.localValue + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - /** - * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only. - * - * Executors only send accumulator updates back to the driver, not [[TaskMetrics]]. However, we - * need the latter to post task end events to listeners, so we need to reconstruct the metrics - * on the driver. - * - * This assumes the provided updates contain the initial set of accumulators representing - * internal task level metrics. - */ - def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = { - val definedAccumUpdates = accumUpdates.filter(_.update.isDefined) - val metrics = new ListenerTaskMetrics(definedAccumUpdates) - // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is only used to post - // event, we should un-register all accumulators immediately. - metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id)) - definedAccumUpdates.filter(_.internal).foreach { accum => - metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get)) - } - metrics + override def localValue: Seq[(BlockId, BlockStatus)] = _seq + + def setValue(newValue: Seq[(BlockId, BlockStatus)]): Unit = { + _seq.clear() + _seq ++= newValue } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b7fb608ea5..a96d5f6fbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -209,7 +209,7 @@ class DAGScheduler( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[AccumulableInfo], + accumUpdates: Seq[NewAccumulator[_, _]], taskInfo: TaskInfo): Unit = { eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo)) @@ -1088,21 +1088,19 @@ class DAGScheduler( val task = event.task val stage = stageIdToStage(task.stageId) try { - event.accumUpdates.foreach { ainfo => - assert(ainfo.update.isDefined, "accumulator from task should have a partial value") - val id = ainfo.id - val partialValue = ainfo.update.get + event.accumUpdates.foreach { updates => + val id = updates.id // Find the corresponding accumulator on the driver and update it - val acc: Accumulable[Any, Any] = Accumulators.get(id) match { - case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] + val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match { + case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]] case None => throw new SparkException(s"attempted to access non-existent accumulator $id") } - acc ++= partialValue + acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]]) // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && partialValue != acc.zero) { + if (acc.name.isDefined && !updates.isZero()) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) - event.taskInfo.accumulables += acc.toInfo(Some(partialValue), Some(acc.value)) + event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) } } } catch { @@ -1131,7 +1129,7 @@ class DAGScheduler( val taskMetrics: TaskMetrics = if (event.accumUpdates.nonEmpty) { try { - TaskMetrics.fromAccumulatorUpdates(event.accumUpdates) + TaskMetrics.fromAccumulators(event.accumUpdates) } catch { case NonFatal(e) => logError(s"Error when attempting to reconstruct metrics for task $taskId", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index a3845c6acd..e57a2246d8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[AccumulableInfo], + accumUpdates: Seq[NewAccumulator[_, _]], taskInfo: TaskInfo) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 080ea6c33a..7618dfeeed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -21,18 +21,15 @@ import java.util.Properties import javax.annotation.Nullable import scala.collection.Map -import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonTypeInfo import org.apache.spark.{SparkConf, TaskEndReason} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.Logging import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{Distribution, Utils} @DeveloperApi @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event") diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 02185bf631..2f972b064b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -112,7 +112,7 @@ private[scheduler] abstract class Stage( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics - metrics.registerForCleanup(rdd.sparkContext) + metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index eb10f3e69b..e7ca6efd84 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -23,7 +23,7 @@ import java.util.Properties import scala.collection.mutable.HashMap -import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl} +import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -52,7 +52,7 @@ private[spark] abstract class Task[T]( val stageAttemptId: Int, val partitionId: Int, // The default value is only used in tests. - val metrics: TaskMetrics = TaskMetrics.empty, + val metrics: TaskMetrics = TaskMetrics.registered, @transient var localProperties: Properties = new Properties) extends Serializable { /** @@ -153,11 +153,11 @@ private[spark] abstract class Task[T]( * Collect the latest values of accumulators used in this task. If the task failed, * filter out the accumulators whose values should not be included on failures. */ - def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo] = { + def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[NewAccumulator[_, _]] = { if (context != null) { - context.taskMetrics.accumulatorUpdates().filter { a => !taskFailed || a.countFailedValues } + context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues } } else { - Seq.empty[AccumulableInfo] + Seq.empty } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 03135e63d7..b472c5511b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkEnv +import org.apache.spark.{NewAccumulator, SparkEnv} import org.apache.spark.storage.BlockId import org.apache.spark.util.Utils @@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int) /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] class DirectTaskResult[T]( var valueBytes: ByteBuffer, - var accumUpdates: Seq[AccumulableInfo]) + var accumUpdates: Seq[NewAccumulator[_, _]]) extends TaskResult[T] with Externalizable { private var valueObjectDeserialized = false @@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T]( if (numUpdates == 0) { accumUpdates = null } else { - val _accumUpdates = new ArrayBuffer[AccumulableInfo] + val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]] for (i <- 0 until numUpdates) { - _accumUpdates += in.readObject.asInstanceOf[AccumulableInfo] + _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]] } accumUpdates = _accumUpdates } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index ae7ef46abb..b438c285fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -93,9 +93,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul // we would have to serialize the result again after updating the size. result.accumUpdates = result.accumUpdates.map { a => if (a.name == Some(InternalAccumulator.RESULT_SIZE)) { - assert(a.update == Some(0L), - "task result size should not have been set on the executors") - a.copy(update = Some(size.toLong)) + val acc = a.asInstanceOf[LongAccumulator] + assert(acc.sum == 0L, "task result size should not have been set on the executors") + acc.setValue(size.toLong) + acc } else { a } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 647d44a0f0..75a0c56311 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import org.apache.spark.NewAccumulator import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId @@ -66,7 +67,7 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f31ec2af4e..776a3226cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -384,13 +384,14 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { accumUpdates.flatMap { case (id, updates) => taskIdToTaskSetManager.get(id).map { taskSetMgr => - (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates) + (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, + updates.map(acc => acc.toInfo(Some(acc.value), None))) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6e08cdd87a..b79f643a74 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -647,7 +647,7 @@ private[spark] class TaskSetManager( info.markFailed() val index = info.index copiesRunning(index) -= 1 - var accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo] + var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + reason.asInstanceOf[TaskFailedReason].toErrorString val failureException: Option[Throwable] = reason match { @@ -663,7 +663,7 @@ private[spark] class TaskSetManager( case ef: ExceptionFailure => // ExceptionFailure's might have accumulator updates - accumUpdates = ef.accumUpdates + accumUpdates = ef.accums if (ef.className == classOf[NotSerializableException].getName) { // If the task result wasn't serializable, there's no point in trying to re-execute it. logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying" @@ -788,7 +788,7 @@ private[spark] class TaskSetManager( // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. sched.dagScheduler.taskEnded( - tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info) + tasks(index), Resubmitted, null, Seq.empty, info) } } } diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala index 8daca6c390..c04b483831 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala @@ -266,7 +266,13 @@ private[spark] object SerializationDebugger extends Logging { (o, desc) } else { // write place - findObjectAndDescriptor(desc.invokeWriteReplace(o)) + val replaced = desc.invokeWriteReplace(o) + // `writeReplace` may return the same object. + if (replaced eq o) { + (o, desc) + } else { + findObjectAndDescriptor(replaced) + } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index ff28796a60..32e332a9ad 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -186,8 +186,8 @@ class OutputMetrics private[spark]( val recordsWritten: Long) class ShuffleReadMetrics private[spark]( - val remoteBlocksFetched: Int, - val localBlocksFetched: Int, + val remoteBlocksFetched: Long, + val localBlocksFetched: Long, val fetchWaitTime: Long, val remoteBytesRead: Long, val localBytesRead: Long, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1c4921666f..f2d06c7ea8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -801,7 +801,7 @@ private[spark] class BlockManager( reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) + c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) } } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) @@ -958,7 +958,7 @@ private[spark] class BlockManager( reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) + c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { @@ -1257,7 +1257,7 @@ private[spark] class BlockManager( } if (blockIsUpdated) { Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status))) + c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) } } status.storageLevel @@ -1311,7 +1311,7 @@ private[spark] class BlockManager( reportBlockStatus(blockId, info, removeBlockStatus) } Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, removeBlockStatus))) + c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 9ab7d96e29..945830c8bf 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -375,26 +375,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.taskTime += info.duration stageData.numActiveTasks -= 1 - val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) = + val errorMessage: Option[String] = taskEnd.reason match { case org.apache.spark.Success => stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 - (None, taskEnd.taskMetrics.accumulatorUpdates()) + None case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 - (Some(e.toErrorString), e.accumUpdates) + Some(e.toErrorString) case e: TaskFailedReason => // All other failure cases stageData.numFailedTasks += 1 - (Some(e.toErrorString), Seq.empty[AccumulableInfo]) + Some(e.toErrorString) } - val taskMetrics = - if (accums.nonEmpty) { - Some(TaskMetrics.fromAccumulatorUpdates(accums)) - } else { - None - } + val taskMetrics = Option(taskEnd.taskMetrics) taskMetrics.foreach { m => val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics) updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) @@ -503,7 +498,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) val taskData = stageData.taskData.get(taskId) - val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates) + val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) taskData.foreach { t => if (!t.taskInfo.finished) { updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a613fbc5cc..aeab71d9df 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -840,7 +840,9 @@ private[spark] object JsonProtocol { // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) - .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates()) + .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => { + acc.toInfo(Some(acc.localValue), None) + })) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled |