aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandeep Singh <sandeep@techaddict.me>2016-05-03 11:38:43 -0700
committerReynold Xin <rxin@databricks.com>2016-05-03 11:38:43 -0700
commitca813330c716bed76ac0034c12f56665960a1105 (patch)
tree26c14bffca2c6e365a9cf26ed7696252c12c6c67 /core
parentb545d752195f6dcba4c512b8a1d5bf5b74279dc8 (diff)
downloadspark-ca813330c716bed76ac0034c12f56665960a1105.tar.gz
spark-ca813330c716bed76ac0034c12f56665960a1105.tar.bz2
spark-ca813330c716bed76ac0034c12f56665960a1105.zip
[SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only value
## What changes were proposed in this pull request? Remove AccumulatorV2.localValue and keep only value ## How was this patch tested? existing tests Author: Sandeep Singh <sandeep@techaddict.me> Closes #12865 from techaddict/SPARK-15087.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala4
7 files changed, 25 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 5c6761eb76..812145aaee 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -110,7 +110,13 @@ class Accumulable[R, T] private (
/**
* Access the accumulator's current value; only allowed on driver.
*/
- def value: R = newAcc.value
+ def value: R = {
+ if (newAcc.isAtDriverSide) {
+ newAcc.value
+ } else {
+ throw new UnsupportedOperationException("Can't read accumulator value in task")
+ }
+ }
/**
* Get the current value of this accumulator from within a task.
@@ -121,7 +127,7 @@ class Accumulable[R, T] private (
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
- def localValue: R = newAcc.localValue
+ def localValue: R = newAcc.value
/**
* Set the accumulator's value; only allowed on driver.
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4f74dc92d7..64e87a95d0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -362,7 +362,7 @@ private[spark] class Executor(
Seq.empty
}
- val accUpdates = accums.map(acc => acc.toInfo(Some(acc.localValue), None))
+ val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
val serializedTaskEndReason = {
try {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 085aa7fbd6..7f4652c2dd 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -99,7 +99,7 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* Storage statuses of any blocks that have been updated as a result of this task.
*/
- def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue
+ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.value
// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
@@ -301,12 +301,12 @@ private[spark] class BlockStatusesAccumulator
override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]])
: Unit = other match {
- case o: BlockStatusesAccumulator => _seq ++= o.localValue
+ case o: BlockStatusesAccumulator => _seq ++= o.value
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def localValue: Seq[(BlockId, BlockStatus)] = _seq
+ override def value: Seq[(BlockId, BlockStatus)] = _seq
def setValue(newValue: Seq[(BlockId, BlockStatus)]): Unit = {
_seq.clear()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 393680f4c1..8ce8fb15ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -394,7 +394,7 @@ private[spark] class TaskSchedulerImpl(
// deserialized. This brings trouble to the accumulator framework, which depends on
// serialization to set the `atDriverSide` flag. Here we call `acc.localValue` instead to
// be more robust about this issue.
- val accInfos = updates.map(acc => acc.toInfo(Some(acc.localValue), None))
+ val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
taskIdToTaskSetManager.get(id).map { taskSetMgr =>
(id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
}
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0e280f6f6a..d8f380e123 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -126,23 +126,9 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
def merge(other: AccumulatorV2[IN, OUT]): Unit
/**
- * Access this accumulator's current value; only allowed on driver.
+ * Defines the current value of this accumulator
*/
- final def value: OUT = {
- if (atDriverSide) {
- localValue
- } else {
- throw new UnsupportedOperationException("Can't read accumulator value in task")
- }
- }
-
- /**
- * Defines the current value of this accumulator.
- *
- * This is NOT the global value of the accumulator. To get the global value after a
- * completed operation on the dataset, call `value`.
- */
- def localValue: OUT
+ def value: OUT
// Called by Java when serializing an object
final protected def writeReplace(): Any = {
@@ -182,7 +168,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
if (metadata == null) {
"Un-registered Accumulator: " + getClass.getSimpleName
} else {
- getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
+ getClass.getSimpleName + s"(id: $id, name: $name, value: $value)"
}
}
}
@@ -321,7 +307,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
private[spark] def setValue(newValue: Long): Unit = _sum = newValue
- override def localValue: jl.Long = _sum
+ override def value: jl.Long = _sum
}
@@ -386,7 +372,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
private[spark] def setValue(newValue: Double): Unit = _sum = newValue
- override def localValue: jl.Double = _sum
+ override def value: jl.Double = _sum
}
@@ -400,12 +386,12 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def add(v: T): Unit = _list.add(v)
override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
- case o: ListAccumulator[T] => _list.addAll(o.localValue)
+ case o: ListAccumulator[T] => _list.addAll(o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
+ override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
private[spark] def setValue(newValue: java.util.List[T]): Unit = {
_list.clear()
@@ -430,10 +416,10 @@ class LegacyAccumulatorWrapper[R, T](
override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
override def merge(other: AccumulatorV2[T, R]): Unit = other match {
- case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue)
+ case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def localValue: R = _value
+ override def value: R = _value
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index aeab71d9df..18547d459e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -841,7 +841,7 @@ private[spark] object JsonProtocol {
val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
.map(_.extract[List[JValue]].map(accumulableInfoFromJson))
.getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
- acc.toInfo(Some(acc.localValue), None)
+ acc.toInfo(Some(acc.value), None)
}))
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index cade67b1d2..6cbd5ae5d4 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -70,7 +70,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// serialize and de-serialize it, to simulate sending accumulator to executor.
val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc))
// value is reset on the executors
- assert(acc2.localValue == 0)
+ assert(acc2.value == 0)
assert(!acc2.isAtDriverSide)
acc2.add(10)
@@ -259,7 +259,7 @@ private[spark] object AccumulatorSuite {
* Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
* info as an accumulator update.
*/
- def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None)
+ def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
/**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory