diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 13cb6a28c3..21ba46024d 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.{lang => jl} import java.io.ObjectInputStream +import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { - private val _list: java.util.List[T] = new java.util.ArrayList[T] + private val _list: java.util.List[T] = new ArrayList[T] override def isZero: Boolean = _list.isEmpty @@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + override def value: java.util.List[T] = _list.synchronized { + java.util.Collections.unmodifiableList(new ArrayList[T](_list)) + } private[spark] def setValue(newValue: java.util.List[T]): Unit = { _list.clear() |