aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala32
1 files changed, 9 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 0e280f6f6a..d8f380e123 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -126,23 +126,9 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
def merge(other: AccumulatorV2[IN, OUT]): Unit
/**
- * Access this accumulator's current value; only allowed on driver.
+ * Defines the current value of this accumulator
*/
- final def value: OUT = {
- if (atDriverSide) {
- localValue
- } else {
- throw new UnsupportedOperationException("Can't read accumulator value in task")
- }
- }
-
- /**
- * Defines the current value of this accumulator.
- *
- * This is NOT the global value of the accumulator. To get the global value after a
- * completed operation on the dataset, call `value`.
- */
- def localValue: OUT
+ def value: OUT
// Called by Java when serializing an object
final protected def writeReplace(): Any = {
@@ -182,7 +168,7 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
if (metadata == null) {
"Un-registered Accumulator: " + getClass.getSimpleName
} else {
- getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
+ getClass.getSimpleName + s"(id: $id, name: $name, value: $value)"
}
}
}
@@ -321,7 +307,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
private[spark] def setValue(newValue: Long): Unit = _sum = newValue
- override def localValue: jl.Long = _sum
+ override def value: jl.Long = _sum
}
@@ -386,7 +372,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
private[spark] def setValue(newValue: Double): Unit = _sum = newValue
- override def localValue: jl.Double = _sum
+ override def value: jl.Double = _sum
}
@@ -400,12 +386,12 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def add(v: T): Unit = _list.add(v)
override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
- case o: ListAccumulator[T] => _list.addAll(o.localValue)
+ case o: ListAccumulator[T] => _list.addAll(o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
+ override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
private[spark] def setValue(newValue: java.util.List[T]): Unit = {
_list.clear()
@@ -430,10 +416,10 @@ class LegacyAccumulatorWrapper[R, T](
override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
override def merge(other: AccumulatorV2[T, R]): Unit = other match {
- case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue)
+ case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def localValue: R = _value
+ override def value: R = _value
}