aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 13cb6a28c3..21ba46024d 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
import java.{lang => jl}
import java.io.ObjectInputStream
+import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
@@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
- private val _list: java.util.List[T] = new java.util.ArrayList[T]
+ private val _list: java.util.List[T] = new ArrayList[T]
override def isZero: Boolean = _list.isEmpty
@@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
+ override def value: java.util.List[T] = _list.synchronized {
+ java.util.Collections.unmodifiableList(new ArrayList[T](_list))
+ }
private[spark] def setValue(newValue: java.util.List[T]): Unit = {
_list.clear()