aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-01 20:21:02 -0700
committerReynold Xin <rxin@databricks.com>2016-05-01 20:21:02 -0700
commit44da8d8eabeccc12bfed0d43b37d54e0da845c66 (patch)
treef8099e7da3dda65e30d1d5175e954b531db89da1 /core
parenta832cef11233c6357c7ba7ede387b432e6b0ed71 (diff)
downloadspark-44da8d8eabeccc12bfed0d43b37d54e0da845c66.tar.gz
spark-44da8d8eabeccc12bfed0d43b37d54e0da845c66.tar.bz2
spark-44da8d8eabeccc12bfed0d43b37d54e0da845c66.zip
[SPARK-15049] Rename NewAccumulator to AccumulatorV2
## What changes were proposed in this pull request? NewAccumulator isn't the best name if we ever come up with v3 of the API. ## How was this patch tested? Updated tests to reflect the change. Author: Reynold Xin <rxin@databricks.com> Closes #12827 from rxin/SPARK-15049.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/AccumulatorV2.scala (renamed from core/src/main/scala/org/apache/spark/NewAccumulator.scala)69
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala6
22 files changed, 82 insertions, 81 deletions
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala
index 1571e15b76..c65108a55e 100644
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala
@@ -21,9 +21,6 @@ import java.{lang => jl}
import java.io.ObjectInputStream
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.JavaConverters._
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
@@ -39,7 +36,7 @@ private[spark] case class AccumulatorMetadata(
* The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of
* type `OUT`.
*/
-abstract class NewAccumulator[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[IN, OUT] extends Serializable {
private[spark] var metadata: AccumulatorMetadata = _
private[this] var atDriverSide = true
@@ -95,7 +92,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
}
/**
- * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] with the provided
+ * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
* values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
@@ -106,16 +103,16 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
final private[spark] def isAtDriverSide: Boolean = atDriverSide
/**
- * Tells if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
+ * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
* value; for a list accumulator, Nil is zero value.
*/
- def isZero(): Boolean
+ def isZero: Boolean
/**
* Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy
* must return true.
*/
- def copyAndReset(): NewAccumulator[IN, OUT]
+ def copyAndReset(): AccumulatorV2[IN, OUT]
/**
* Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator.
@@ -126,7 +123,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
* Merges another same-type accumulator into this one and update its state, i.e. this should be
* merge-in-place.
*/
- def merge(other: NewAccumulator[IN, OUT]): Unit
+ def merge(other: AccumulatorV2[IN, OUT]): Unit
/**
* Access this accumulator's current value; only allowed on driver.
@@ -155,7 +152,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
"Accumulator must be registered before send to executor")
}
val copy = copyAndReset()
- assert(copy.isZero(), "copyAndReset must return a zero value copy")
+ assert(copy.isZero, "copyAndReset must return a zero value copy")
copy.metadata = metadata
copy
} else {
@@ -191,6 +188,9 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
}
+/**
+ * An internal class used to track accumulators by Spark itself.
+ */
private[spark] object AccumulatorContext {
/**
@@ -199,20 +199,21 @@ private[spark] object AccumulatorContext {
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
- private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
+ private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]]
private[this] val nextId = new AtomicLong(0L)
/**
- * Return a globally unique ID for a new [[Accumulator]].
+ * Returns a globally unique ID for a new [[Accumulator]].
* Note: Once you copy the [[Accumulator]] the ID is no longer unique.
*/
def newId(): Long = nextId.getAndIncrement
+ /** Returns the number of accumulators registered. Used in testing. */
def numAccums: Int = originals.size
/**
- * Register an [[Accumulator]] created on the driver such that it can be used on the executors.
+ * Registers an [[Accumulator]] created on the driver such that it can be used on the executors.
*
* All accumulators registered here can later be used as a container for accumulating partial
* values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
@@ -222,21 +223,21 @@ private[spark] object AccumulatorContext {
* If an [[Accumulator]] with the same ID was already registered, this does nothing instead
* of overwriting it. We will never register same accumulator twice, this is just a sanity check.
*/
- def register(a: NewAccumulator[_, _]): Unit = {
- originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a))
+ def register(a: AccumulatorV2[_, _]): Unit = {
+ originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a))
}
/**
- * Unregister the [[Accumulator]] with the given ID, if any.
+ * Unregisters the [[Accumulator]] with the given ID, if any.
*/
def remove(id: Long): Unit = {
originals.remove(id)
}
/**
- * Return the [[Accumulator]] registered with the given ID, if any.
+ * Returns the [[Accumulator]] registered with the given ID, if any.
*/
- def get(id: Long): Option[NewAccumulator[_, _]] = {
+ def get(id: Long): Option[AccumulatorV2[_, _]] = {
Option(originals.get(id)).map { ref =>
// Since we are storing weak references, we must check whether the underlying data is valid.
val acc = ref.get
@@ -248,7 +249,7 @@ private[spark] object AccumulatorContext {
}
/**
- * Clear all registered [[Accumulator]]s. For testing only.
+ * Clears all registered [[Accumulator]]s. For testing only.
*/
def clear(): Unit = {
originals.clear()
@@ -256,10 +257,10 @@ private[spark] object AccumulatorContext {
}
-class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
+class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
private[this] var _sum = 0L
- override def isZero(): Boolean = _sum == 0
+ override def isZero: Boolean = _sum == 0
override def copyAndReset(): LongAccumulator = new LongAccumulator
@@ -269,7 +270,7 @@ class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
def sum: Long = _sum
- override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other match {
+ override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match {
case o: LongAccumulator => _sum += o.sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
@@ -281,10 +282,10 @@ class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
}
-class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
+class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
private[this] var _sum = 0.0
- override def isZero(): Boolean = _sum == 0.0
+ override def isZero: Boolean = _sum == 0.0
override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
@@ -294,7 +295,7 @@ class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
def sum: Double = _sum
- override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match {
+ override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match {
case o: DoubleAccumulator => _sum += o.sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
@@ -306,11 +307,11 @@ class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
}
-class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
+class AverageAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
private[this] var _sum = 0.0
private[this] var _count = 0L
- override def isZero(): Boolean = _sum == 0.0 && _count == 0
+ override def isZero: Boolean = _sum == 0.0 && _count == 0
override def copyAndReset(): AverageAccumulator = new AverageAccumulator
@@ -324,7 +325,7 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
_count += 1
}
- override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = other match {
+ override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match {
case o: AverageAccumulator =>
_sum += o.sum
_count += o.count
@@ -344,16 +345,16 @@ class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
}
-class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
+class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
- override def isZero(): Boolean = _list.isEmpty
+ override def isZero: Boolean = _list.isEmpty
override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
override def add(v: T): Unit = _list.add(v)
- override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = other match {
+ override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
case o: ListAccumulator[T] => _list.addAll(o.localValue)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
@@ -370,10 +371,10 @@ class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
class LegacyAccumulatorWrapper[R, T](
initialValue: R,
- param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, R] {
+ param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] {
private[spark] var _value = initialValue // Current value on driver
- override def isZero(): Boolean = _value == param.zero(initialValue)
+ override def isZero: Boolean = _value == param.zero(initialValue)
override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
val acc = new LegacyAccumulatorWrapper(initialValue, param)
@@ -383,7 +384,7 @@ class LegacyAccumulatorWrapper[R, T](
override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
- override def merge(other: NewAccumulator[T, R]): Unit = other match {
+ override def merge(other: AccumulatorV2[T, R]): Unit = other match {
case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 63a00a84af..a51338c017 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
registerForCleanup(rdd, CleanRDD(rdd.id))
}
- def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
+ def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
registerForCleanup(a, CleanAccum(a.id))
}
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 9eac05fdf9..29018c75b9 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
- accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> accumulator updates
+ accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2cb3ed0296..d0f88d4e4d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1282,7 +1282,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Register the given accumulator. Note that accumulators must be registered before use, or it
* will throw exception.
*/
- def register(acc: NewAccumulator[_, _]): Unit = {
+ def register(acc: AccumulatorV2[_, _]): Unit = {
acc.register(this)
}
@@ -1290,7 +1290,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Register the given accumulator with given name. Note that accumulators must be registered
* before use, or it will throw exception.
*/
- def register(acc: NewAccumulator[_, _], name: String): Unit = {
+ def register(acc: AccumulatorV2[_, _], name: String): Unit = {
acc.register(this, name = Some(name))
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 9e53257462..1a8f8cf11c 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable {
* Register an accumulator that belongs to this task. Accumulators must call this method when
* deserializing in executors.
*/
- private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit
+ private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index bc3807f5db..c904e08391 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -122,7 +122,7 @@ private[spark] class TaskContextImpl(
override def getMetricsSources(sourceName: String): Seq[Source] =
metricsSystem.getSourcesByName(sourceName)
- private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
+ private[spark] override def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
taskMetrics.registerAccumulator(a)
}
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 82ba2d0c27..ef333e397f 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -118,7 +118,7 @@ case class ExceptionFailure(
fullStackTrace: String,
private val exceptionWrapper: Option[ThrowableSerializationWrapper],
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
- private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil)
+ private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)
extends TaskFailedReason {
/**
@@ -138,7 +138,7 @@ case class ExceptionFailure(
this(e, accumUpdates, preserveCause = true)
}
- private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): ExceptionFailure = {
+ private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): ExceptionFailure = {
this.accums = accums
this
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4d61f7e232..4f74dc92d7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -353,7 +353,7 @@ private[spark] class Executor(
logError(s"Exception in $taskName (TID $taskId)", t)
// Collect latest accumulator values to report back to the driver
- val accums: Seq[NewAccumulator[_, _]] =
+ val accums: Seq[AccumulatorV2[_, _]] =
if (task != null) {
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
@@ -478,7 +478,7 @@ private[spark] class Executor(
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
// list of (task id, accumUpdates) to send back to the driver
- val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]()
+ val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()
for (taskRunner <- runningTasks.values().asScala) {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0b64917219..56d034fd03 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -201,7 +201,7 @@ class TaskMetrics private[spark] () extends Serializable {
output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
) ++ testAccum.map(TEST_ACCUM -> _)
- @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] =
+ @transient private[spark] lazy val internalAccums: Seq[AccumulatorV2[_, _]] =
nameToAccums.values.toIndexedSeq
/* ========================== *
@@ -217,13 +217,13 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* External accumulators registered with this task.
*/
- @transient private lazy val externalAccums = new ArrayBuffer[NewAccumulator[_, _]]
+ @transient private lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]]
- private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
+ private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
externalAccums += a
}
- private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = internalAccums ++ externalAccums
+ private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
}
@@ -271,15 +271,15 @@ private[spark] object TaskMetrics extends Logging {
/**
* Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.
*/
- def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = {
+ def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = {
val tm = new TaskMetrics
val (internalAccums, externalAccums) =
accums.partition(a => a.name.isDefined && tm.nameToAccums.contains(a.name.get))
internalAccums.foreach { acc =>
- val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]]
+ val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[AccumulatorV2[Any, Any]]
tmAcc.metadata = acc.metadata
- tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]])
+ tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
}
tm.externalAccums ++= externalAccums
@@ -289,7 +289,7 @@ private[spark] object TaskMetrics extends Logging {
private[spark] class BlockStatusesAccumulator
- extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
+ extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
override def isZero(): Boolean = _seq.isEmpty
@@ -298,7 +298,7 @@ private[spark] class BlockStatusesAccumulator
override def add(v: (BlockId, BlockStatus)): Unit = _seq += v
- override def merge(other: NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]])
+ override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]])
: Unit = other match {
case o: BlockStatusesAccumulator => _seq ++= o.localValue
case _ => throw new UnsupportedOperationException(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a96d5f6fbf..4dfd532e93 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -209,7 +209,7 @@ class DAGScheduler(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[NewAccumulator[_, _]],
+ accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo))
@@ -1091,14 +1091,14 @@ class DAGScheduler(
event.accumUpdates.foreach { updates =>
val id = updates.id
// Find the corresponding accumulator on the driver and update it
- val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match {
- case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]]
+ val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
+ case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
case None =>
throw new SparkException(s"attempted to access non-existent accumulator $id")
}
- acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]])
+ acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
// To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && !updates.isZero()) {
+ if (acc.name.isDefined && !updates.isZero) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index e57a2246d8..0a2c2dc039 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[NewAccumulator[_, _]],
+ accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo)
extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index e7ca6efd84..362f8e51ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -153,7 +153,7 @@ private[spark] abstract class Task[T](
* Collect the latest values of accumulators used in this task. If the task failed,
* filter out the accumulators whose values should not be included on failures.
*/
- def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[NewAccumulator[_, _]] = {
+ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = {
if (context != null) {
context.taskMetrics.accumulators().filter { a => !taskFailed || a.countFailedValues }
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index b472c5511b..69ce00f30d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{NewAccumulator, SparkEnv}
+import org.apache.spark.{AccumulatorV2, SparkEnv}
import org.apache.spark.storage.BlockId
import org.apache.spark.util.Utils
@@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int)
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark] class DirectTaskResult[T](
var valueBytes: ByteBuffer,
- var accumUpdates: Seq[NewAccumulator[_, _]])
+ var accumUpdates: Seq[AccumulatorV2[_, _]])
extends TaskResult[T] with Externalizable {
private var valueObjectDeserialized = false
@@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T](
if (numUpdates == 0) {
accumUpdates = null
} else {
- val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]]
+ val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) {
- _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]]
+ _accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]]
}
accumUpdates = _accumUpdates
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 75a0c56311..9881a1018c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import org.apache.spark.NewAccumulator
+import org.apache.spark.AccumulatorV2
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
@@ -67,7 +67,7 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+ accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8fa4aa121c..666b636558 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -384,7 +384,7 @@ private[spark] class TaskSchedulerImpl(
*/
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+ accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean = {
// (taskId, stageId, stageAttemptId, accumUpdates)
val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b79f643a74..b724050f5b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -647,7 +647,7 @@ private[spark] class TaskSetManager(
info.markFailed()
val index = info.index
copiesRunning(index) -= 1
- var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty
+ var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
reason.asInstanceOf[TaskFailedReason].toErrorString
val failureException: Option[Throwable] = reason match {
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 9c90049715..09eb9c1dbd 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -273,7 +273,7 @@ private[spark] object AccumulatorSuite {
* Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
* info as an accumulator update.
*/
- def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None)
+ def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None)
/**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 688eb6bde9..25977a4660 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -213,7 +213,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
private val accumsRegistered = new ArrayBuffer[Long]
- override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
+ override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
accumsRegistered += a.id
super.registerAccumulatorForCleanup(a)
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 94f6e1a3a7..27a1e7bb35 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -203,7 +203,7 @@ class TaskMetricsSuite extends SparkFunSuite {
acc1.add(1)
acc2.add(2)
val newUpdates = tm.accumulators()
- .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap
+ .map(a => (a.id, a.asInstanceOf[AccumulatorV2[Any, Any]])).toMap
assert(newUpdates.contains(acc1.id))
assert(newUpdates.contains(acc2.id))
assert(newUpdates.contains(acc3.id))
@@ -230,8 +230,8 @@ private[spark] object TaskMetricsSuite extends Assertions {
* Note: this does NOT check accumulator ID equality.
*/
def assertUpdatesEquals(
- updates1: Seq[NewAccumulator[_, _]],
- updates2: Seq[NewAccumulator[_, _]]): Unit = {
+ updates1: Seq[AccumulatorV2[_, _]],
+ updates2: Seq[AccumulatorV2[_, _]]): Unit = {
assert(updates1.size === updates2.size)
updates1.zip(updates2).foreach { case (acc1, acc2) =>
// do not assert ID equals here
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9912d1f3bc..5a5c3a0cd1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def stop() = {}
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+ accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
@@ -483,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+ accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
@@ -2012,7 +2012,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
task: Task[_],
reason: TaskEndReason,
result: Any,
- extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty,
+ extraAccumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty,
taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
val accumUpdates = reason match {
case Success => task.metrics.accumulators()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 16027d944f..72ac848f12 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{AccumulatorV2, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
@@ -67,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler {
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
+ accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 339fc4254d..122a3ecb49 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[NewAccumulator[_, _]],
+ accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo) {
taskScheduler.endedTasks(taskInfo.index) = reason
}
@@ -184,7 +184,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
- val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task =>
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
@@ -791,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
private def createTaskResult(
id: Int,
- accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = {
+ accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}