diff options
author | Reynold Xin <rxin@databricks.com> | 2016-05-01 20:21:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-01 20:21:02 -0700 |
commit | 44da8d8eabeccc12bfed0d43b37d54e0da845c66 (patch) | |
tree | f8099e7da3dda65e30d1d5175e954b531db89da1 /core | |
parent | a832cef11233c6357c7ba7ede387b432e6b0ed71 (diff) | |
download | spark-44da8d8eabeccc12bfed0d43b37d54e0da845c66.tar.gz spark-44da8d8eabeccc12bfed0d43b37d54e0da845c66.tar.bz2 spark-44da8d8eabeccc12bfed0d43b37d54e0da845c66.zip |
[SPARK-15049] Rename NewAccumulator to AccumulatorV2
## What changes were proposed in this pull request?
NewAccumulator isn't the best name if we ever come up with v3 of the API.
## How was this patch tested?
Updated tests to reflect the change.
Author: Reynold Xin <rxin@databricks.com>
Closes #12827 from rxin/SPARK-15049.
Diffstat (limited to 'core')
22 files changed, 82 insertions, 81 deletions
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala index 1571e15b76..c65108a55e 100644 --- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala @@ -21,9 +21,6 @@ import java.{lang => jl} import java.io.ObjectInputStream import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong -import javax.annotation.concurrent.GuardedBy - -import scala.collection.JavaConverters._ import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.util.Utils @@ -39,7 +36,7 @@ private[spark] case class AccumulatorMetadata( * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of * type `OUT`. */ -abstract class NewAccumulator[IN, OUT] extends Serializable { +abstract class AccumulatorV2[IN, OUT] extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ private[this] var atDriverSide = true @@ -95,7 +92,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { } /** - * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] with the provided + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided * values. */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { @@ -106,16 +103,16 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { final private[spark] def isAtDriverSide: Boolean = atDriverSide /** - * Tells if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero + * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero * value; for a list accumulator, Nil is zero value. */ - def isZero(): Boolean + def isZero: Boolean /** * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy * must return true. */ - def copyAndReset(): NewAccumulator[IN, OUT] + def copyAndReset(): AccumulatorV2[IN, OUT] /** * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. @@ -126,7 +123,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { * Merges another same-type accumulator into this one and update its state, i.e. this should be * merge-in-place. */ - def merge(other: NewAccumulator[IN, OUT]): Unit + def merge(other: AccumulatorV2[IN, OUT]): Unit /** * Access this accumulator's current value; only allowed on driver. @@ -155,7 +152,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { "Accumulator must be registered before send to executor") } val copy = copyAndReset() - assert(copy.isZero(), "copyAndReset must return a zero value copy") + assert(copy.isZero, "copyAndReset must return a zero value copy") copy.metadata = metadata copy } else { @@ -191,6 +188,9 @@ abstract class NewAccumulator[IN, OUT] extends Serializable { } +/** + * An internal class used to track accumulators by Spark itself. + */ private[spark] object AccumulatorContext { /** @@ -199,20 +199,21 @@ private[spark] object AccumulatorContext { * once the RDDs and user-code that reference them are cleaned up. * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). */ - private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]] + private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]] private[this] val nextId = new AtomicLong(0L) /** - * Return a globally unique ID for a new [[Accumulator]]. + * Returns a globally unique ID for a new [[Accumulator]]. * Note: Once you copy the [[Accumulator]] the ID is no longer unique. */ def newId(): Long = nextId.getAndIncrement + /** Returns the number of accumulators registered. Used in testing. */ def numAccums: Int = originals.size /** - * Register an [[Accumulator]] created on the driver such that it can be used on the executors. + * Registers an [[Accumulator]] created on the driver such that it can be used on the executors. * * All accumulators registered here can later be used as a container for accumulating partial * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. @@ -222,21 +223,21 @@ private[spark] object AccumulatorContext { * If an [[Accumulator]] with the same ID was already registered, this does nothing instead * of overwriting it. We will never register same accumulator twice, this is just a sanity check. */ - def register(a: NewAccumulator[_, _]): Unit = { - originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a)) + def register(a: AccumulatorV2[_, _]): Unit = { + originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a)) } /** - * Unregister the [[Accumulator]] with the given ID, if any. + * Unregisters the [[Accumulator]] with the given ID, if any. */ def remove(id: Long): Unit = { originals.remove(id) } /** - * Return the [[Accumulator]] registered with the given ID, if any. + * Returns the [[Accumulator]] registered with the given ID, if any. */ - def get(id: Long): Option[NewAccumulator[_, _]] = { + def get(id: Long): Option[AccumulatorV2[_, _]] = { Option(originals.get(id)).map { ref => // Since we are storing weak references, we must check whether the underlying data is valid. val acc = ref.get @@ -248,7 +249,7 @@ private[spark] object AccumulatorContext { } /** - * Clear all registered [[Accumulator]]s. For testing only. + * Clears all registered [[Accumulator]]s. For testing only. */ def clear(): Unit = { originals.clear() @@ -256,10 +257,10 @@ private[spark] object AccumulatorContext { } -class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] { +class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { private[this] var _sum = 0L - override def isZero(): Boolean = _sum == 0 + override def isZero: Boolean = _sum == 0 override def copyAndReset(): LongAccumulator = new LongAccumulator @@ -269,7 +270,7 @@ class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] { def sum: Long = _sum - override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other match { + override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match { case o: LongAccumulator => _sum += o.sum case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") @@ -281,10 +282,10 @@ class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] { } -class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] { +class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { private[this] var _sum = 0.0 - override def isZero(): Boolean = _sum == 0.0 + override def isZero: Boolean = _sum == 0.0 override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator @@ -294,7 +295,7 @@ class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] { def sum: Double = _sum - override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match { + override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match { case o: DoubleAccumulator => _sum += o.sum case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") @@ -306,11 +307,11 @@ class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] { } -class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] { +class AverageAccumulator extends AccumulatorV2[jl.Double, jl.Double] { private[this] var _sum = 0.0 private[this] var _count = 0L - override def isZero(): Boolean = _sum == 0.0 && _count == 0 + override def isZero: Boolean = _sum == 0.0 && _count == 0 override def copyAndReset(): AverageAccumulator = new AverageAccumulator @@ -324,7 +325,7 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] { _count += 1 } - override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match { + override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match { case o: AverageAccumulator => _sum += o.sum _count += o.count @@ -344,16 +345,16 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] { } -class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] { +class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { private[this] val _list: java.util.List[T] = new java.util.ArrayList[T] - override def isZero(): Boolean = _list.isEmpty + override def isZero: Boolean = _list.isEmpty override def copyAndReset(): ListAccumulator[T] = new ListAccumulator override def add(v: T): Unit = _list.add(v) - override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = other match { + override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { case o: ListAccumulator[T] => _list.addAll(o.localValue) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") @@ -370,10 +371,10 @@ class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] { class LegacyAccumulatorWrapper[R, T]( initialValue: R, - param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, R] { + param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { private[spark] var _value = initialValue // Current value on driver - override def isZero(): Boolean = _value == param.zero(initialValue) + override def isZero: Boolean = _value == param.zero(initialValue) override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = { val acc = new LegacyAccumulatorWrapper(initialValue, param) @@ -383,7 +384,7 @@ class LegacyAccumulatorWrapper[R, T]( override def add(v: T): Unit = _value = param.addAccumulator(_value, v) - override def merge(other: NewAccumulator[T, R]): Unit = other match { + override def merge(other: AccumulatorV2[T, R]): Unit = other match { case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 63a00a84af..a51338c017 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { registerForCleanup(rdd, CleanRDD(rdd.id)) } - def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { + def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { registerForCleanup(a, CleanAccum(a.id)) } diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 9eac05fdf9..29018c75b9 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} */ private[spark] case class Heartbeat( executorId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> accumulator updates + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates blockManagerId: BlockManagerId) /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2cb3ed0296..d0f88d4e4d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1282,7 +1282,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register the given accumulator. Note that accumulators must be registered before use, or it * will throw exception. */ - def register(acc: NewAccumulator[_, _]): Unit = { + def register(acc: AccumulatorV2[_, _]): Unit = { acc.register(this) } @@ -1290,7 +1290,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register the given accumulator with given name. Note that accumulators must be registered * before use, or it will throw exception. */ - def register(acc: NewAccumulator[_, _], name: String): Unit = { + def register(acc: AccumulatorV2[_, _], name: String): Unit = { acc.register(this, name = Some(name)) } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 9e53257462..1a8f8cf11c 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable { * Register an accumulator that belongs to this task. Accumulators must call this method when * deserializing in executors. */ - private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index bc3807f5db..c904e08391 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -122,7 +122,7 @@ private[spark] class TaskContextImpl( override def getMetricsSources(sourceName: String): Seq[Source] = metricsSystem.getSourcesByName(sourceName) - private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): Unit = { + private[spark] override def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { taskMetrics.registerAccumulator(a) } diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 82ba2d0c27..ef333e397f 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -118,7 +118,7 @@ case class ExceptionFailure( fullStackTrace: String, private val exceptionWrapper: Option[ThrowableSerializationWrapper], accumUpdates: Seq[AccumulableInfo] = Seq.empty, - private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil) + private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) extends TaskFailedReason { /** @@ -138,7 +138,7 @@ case class ExceptionFailure( this(e, accumUpdates, preserveCause = true) } - private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): ExceptionFailure = { + private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): ExceptionFailure = { this.accums = accums this } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4d61f7e232..4f74dc92d7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -353,7 +353,7 @@ private[spark] class Executor( logError(s"Exception in $taskName (TID $taskId)", t) // Collect latest accumulator values to report back to the driver - val accums: Seq[NewAccumulator[_, _]] = + val accums: Seq[AccumulatorV2[_, _]] = if (task != null) { task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) @@ -478,7 +478,7 @@ private[spark] class Executor( /** Reports heartbeat and metrics for active tasks to the driver. */ private def reportHeartBeat(): Unit = { // list of (task id, accumUpdates) to send back to the driver - val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]() + val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() for (taskRunner <- runningTasks.values().asScala) { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 0b64917219..56d034fd03 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -201,7 +201,7 @@ class TaskMetrics private[spark] () extends Serializable { output.RECORDS_WRITTEN -> outputMetrics._recordsWritten ) ++ testAccum.map(TEST_ACCUM -> _) - @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] = + @transient private[spark] lazy val internalAccums: Seq[AccumulatorV2[_, _]] = nameToAccums.values.toIndexedSeq /* ========================== * @@ -217,13 +217,13 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private lazy val externalAccums = new ArrayBuffer[NewAccumulator[_, _]] + @transient private lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] - private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = { + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { externalAccums += a } - private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = internalAccums ++ externalAccums + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums } @@ -271,15 +271,15 @@ private[spark] object TaskMetrics extends Logging { /** * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only. */ - def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = { + def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = { val tm = new TaskMetrics val (internalAccums, externalAccums) = accums.partition(a => a.name.isDefined && tm.nameToAccums.contains(a.name.get)) internalAccums.foreach { acc => - val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]] + val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[AccumulatorV2[Any, Any]] tmAcc.metadata = acc.metadata - tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]]) + tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) } tm.externalAccums ++= externalAccums @@ -289,7 +289,7 @@ private[spark] object TaskMetrics extends Logging { private[spark] class BlockStatusesAccumulator - extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { + extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] override def isZero(): Boolean = _seq.isEmpty @@ -298,7 +298,7 @@ private[spark] class BlockStatusesAccumulator override def add(v: (BlockId, BlockStatus)): Unit = _seq += v - override def merge(other: NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) + override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) : Unit = other match { case o: BlockStatusesAccumulator => _seq ++= o.localValue case _ => throw new UnsupportedOperationException( diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a96d5f6fbf..4dfd532e93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -209,7 +209,7 @@ class DAGScheduler( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[NewAccumulator[_, _]], + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = { eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo)) @@ -1091,14 +1091,14 @@ class DAGScheduler( event.accumUpdates.foreach { updates => val id = updates.id // Find the corresponding accumulator on the driver and update it - val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match { - case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]] + val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match { + case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]] case None => throw new SparkException(s"attempted to access non-existent accumulator $id") } - acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]]) + acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]]) // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && !updates.isZero()) { + if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index e57a2246d8..0a2c2dc039 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[NewAccumulator[_, _]], + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index e7ca6efd84..362f8e51ce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -153,7 +153,7 @@ private[spark] abstract class Task[T]( * Collect the latest values of accumulators used in this task. If the task failed, * filter out the accumulators whose values should not be included on failures. */ - def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[NewAccumulator[_, _]] = { + def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = { if (context != null) { context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index b472c5511b..69ce00f30d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{NewAccumulator, SparkEnv} +import org.apache.spark.{AccumulatorV2, SparkEnv} import org.apache.spark.storage.BlockId import org.apache.spark.util.Utils @@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int) /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] class DirectTaskResult[T]( var valueBytes: ByteBuffer, - var accumUpdates: Seq[NewAccumulator[_, _]]) + var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T] with Externalizable { private var valueObjectDeserialized = false @@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T]( if (numUpdates == 0) { accumUpdates = null } else { - val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]] + val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]] for (i <- 0 until numUpdates) { - _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]] + _accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]] } accumUpdates = _accumUpdates } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 75a0c56311..9881a1018c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import org.apache.spark.NewAccumulator +import org.apache.spark.AccumulatorV2 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId @@ -67,7 +67,7 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8fa4aa121c..666b636558 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -384,7 +384,7 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b79f643a74..b724050f5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -647,7 +647,7 @@ private[spark] class TaskSetManager( info.markFailed() val index = info.index copiesRunning(index) -= 1 - var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty + var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + reason.asInstanceOf[TaskFailedReason].toErrorString val failureException: Option[Throwable] = reason match { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 9c90049715..09eb9c1dbd 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -273,7 +273,7 @@ private[spark] object AccumulatorSuite { * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the * info as an accumulator update. */ - def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 688eb6bde9..25977a4660 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -213,7 +213,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { private val accumsRegistered = new ArrayBuffer[Long] - override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { + override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { accumsRegistered += a.id super.registerAccumulatorForCleanup(a) } diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 94f6e1a3a7..27a1e7bb35 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -203,7 +203,7 @@ class TaskMetricsSuite extends SparkFunSuite { acc1.add(1) acc2.add(2) val newUpdates = tm.accumulators() - .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap + .map(a => (a.id, a.asInstanceOf[AccumulatorV2[Any, Any]])).toMap assert(newUpdates.contains(acc1.id)) assert(newUpdates.contains(acc2.id)) assert(newUpdates.contains(acc3.id)) @@ -230,8 +230,8 @@ private[spark] object TaskMetricsSuite extends Assertions { * Note: this does NOT check accumulator ID equality. */ def assertUpdatesEquals( - updates1: Seq[NewAccumulator[_, _]], - updates2: Seq[NewAccumulator[_, _]]): Unit = { + updates1: Seq[AccumulatorV2[_, _]], + updates2: Seq[AccumulatorV2[_, _]]): Unit = { assert(updates1.size === updates2.size) updates1.zip(updates2).foreach { case (acc1, acc2) => // do not assert ID equals here diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9912d1f3bc..5a5c3a0cd1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def stop() = {} override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager @@ -483,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -2012,7 +2012,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou task: Task[_], reason: TaskEndReason, result: Any, - extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty, + extraAccumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty, taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = { val accumUpdates = reason match { case Success => task.metrics.accumulators() diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 16027d944f..72ac848f12 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{AccumulatorV2, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId @@ -67,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], + accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 339fc4254d..122a3ecb49 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[NewAccumulator[_, _]], + accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason } @@ -184,7 +184,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task => + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } @@ -791,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private def createTaskResult( id: Int, - accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = { + accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } |