From ca813330c716bed76ac0034c12f56665960a1105 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Tue, 3 May 2016 11:38:43 -0700 Subject: [SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only value ## What changes were proposed in this pull request? Remove AccumulatorV2.localValue and keep only value ## How was this patch tested? existing tests Author: Sandeep Singh Closes #12865 from techaddict/SPARK-15087. --- .../main/scala/org/apache/spark/Accumulable.scala | 10 +++++-- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/executor/TaskMetrics.scala | 6 ++-- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/util/AccumulatorV2.scala | 32 ++++++---------------- .../scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../scala/org/apache/spark/AccumulatorSuite.scala | 4 +-- 7 files changed, 25 insertions(+), 33 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index 5c6761eb76..812145aaee 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -110,7 +110,13 @@ class Accumulable[R, T] private ( /** * Access the accumulator's current value; only allowed on driver. */ - def value: R = newAcc.value + def value: R = { + if (newAcc.isAtDriverSide) { + newAcc.value + } else { + throw new UnsupportedOperationException("Can't read accumulator value in task") + } + } /** * Get the current value of this accumulator from within a task. @@ -121,7 +127,7 @@ class Accumulable[R, T] private ( * The typical use of this method is to directly mutate the local value, eg., to add * an element to a Set. */ - def localValue: R = newAcc.localValue + def localValue: R = newAcc.value /** * Set the accumulator's value; only allowed on driver. diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4f74dc92d7..64e87a95d0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -362,7 +362,7 @@ private[spark] class Executor( Seq.empty } - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.localValue), None)) + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) val serializedTaskEndReason = { try { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 085aa7fbd6..7f4652c2dd 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -99,7 +99,7 @@ class TaskMetrics private[spark] () extends Serializable { /** * Storage statuses of any blocks that have been updated as a result of this task. */ - def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue + def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.value // Setters and increment-ers private[spark] def setExecutorDeserializeTime(v: Long): Unit = @@ -301,12 +301,12 @@ private[spark] class BlockStatusesAccumulator override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) : Unit = other match { - case o: BlockStatusesAccumulator => _seq ++= o.localValue + case o: BlockStatusesAccumulator => _seq ++= o.value case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def localValue: Seq[(BlockId, BlockStatus)] = _seq + override def value: Seq[(BlockId, BlockStatus)] = _seq def setValue(newValue: Seq[(BlockId, BlockStatus)]): Unit = { _seq.clear() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 393680f4c1..8ce8fb15ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -394,7 +394,7 @@ private[spark] class TaskSchedulerImpl( // deserialized. This brings trouble to the accumulator framework, which depends on // serialization to set the `atDriverSide` flag. Here we call `acc.localValue` instead to // be more robust about this issue. - val accInfos = updates.map(acc => acc.toInfo(Some(acc.localValue), None)) + val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) taskIdToTaskSetManager.get(id).map { taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0e280f6f6a..d8f380e123 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -126,23 +126,9 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def merge(other: AccumulatorV2[IN, OUT]): Unit /** - * Access this accumulator's current value; only allowed on driver. + * Defines the current value of this accumulator */ - final def value: OUT = { - if (atDriverSide) { - localValue - } else { - throw new UnsupportedOperationException("Can't read accumulator value in task") - } - } - - /** - * Defines the current value of this accumulator. - * - * This is NOT the global value of the accumulator. To get the global value after a - * completed operation on the dataset, call `value`. - */ - def localValue: OUT + def value: OUT // Called by Java when serializing an object final protected def writeReplace(): Any = { @@ -182,7 +168,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { if (metadata == null) { "Un-registered Accumulator: " + getClass.getSimpleName } else { - getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)" + getClass.getSimpleName + s"(id: $id, name: $name, value: $value)" } } } @@ -321,7 +307,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { private[spark] def setValue(newValue: Long): Unit = _sum = newValue - override def localValue: jl.Long = _sum + override def value: jl.Long = _sum } @@ -386,7 +372,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { private[spark] def setValue(newValue: Double): Unit = _sum = newValue - override def localValue: jl.Double = _sum + override def value: jl.Double = _sum } @@ -400,12 +386,12 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { override def add(v: T): Unit = _list.add(v) override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { - case o: ListAccumulator[T] => _list.addAll(o.localValue) + case o: ListAccumulator[T] => _list.addAll(o.value) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list) private[spark] def setValue(newValue: java.util.List[T]): Unit = { _list.clear() @@ -430,10 +416,10 @@ class LegacyAccumulatorWrapper[R, T]( override def add(v: T): Unit = _value = param.addAccumulator(_value, v) override def merge(other: AccumulatorV2[T, R]): Unit = other match { - case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) + case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.value) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def localValue: R = _value + override def value: R = _value } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index aeab71d9df..18547d459e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -841,7 +841,7 @@ private[spark] object JsonProtocol { val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => { - acc.toInfo(Some(acc.localValue), None) + acc.toInfo(Some(acc.value), None) })) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index cade67b1d2..6cbd5ae5d4 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -70,7 +70,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex // serialize and de-serialize it, to simulate sending accumulator to executor. val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc)) // value is reset on the executors - assert(acc2.localValue == 0) + assert(acc2.value == 0) assert(!acc2.isAtDriverSide) acc2.add(10) @@ -259,7 +259,7 @@ private[spark] object AccumulatorSuite { * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the * info as an accumulator update. */ - def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory -- cgit v1.2.3