aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-05-22 08:08:46 -0500
committerSean Owen <sowen@cloudera.com>2016-05-22 08:08:46 -0500
commit7920296bf8f313e010205937d3ebcbbc7b1a1d9e (patch)
tree05e9ddfb3dc710c6cbc7e04ae178dbb507db999c /core
parent1ffa608ba5a849739a56047bda8b157b86b08650 (diff)
downloadspark-7920296bf8f313e010205937d3ebcbbc7b1a1d9e.tar.gz
spark-7920296bf8f313e010205937d3ebcbbc7b1a1d9e.tar.bz2
spark-7920296bf8f313e010205937d3ebcbbc7b1a1d9e.zip
[SPARK-15430][SQL] Fix potential ConcurrentModificationException for ListAccumulator
## What changes were proposed in this pull request? In `ListAccumulator` we create an unmodifiable view for underlying list. However, it doesn't prevent the underlying to be modified further. So as we access the unmodifiable list, the underlying list can be modified in the same time. It could cause `java.util.ConcurrentModificationException`. We can observe such exception in recent tests. To fix it, we can copy a list of the underlying list and then create the unmodifiable view of this list instead. ## How was this patch tested? The exception might be difficult to test. Existing tests should be passed. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #13211 from viirya/fix-concurrentmodify.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 13cb6a28c3..21ba46024d 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
import java.{lang => jl}
import java.io.ObjectInputStream
+import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
@@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
- private val _list: java.util.List[T] = new java.util.ArrayList[T]
+ private val _list: java.util.List[T] = new ArrayList[T]
override def isZero: Boolean = _list.isEmpty
@@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
- override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list)
+ override def value: java.util.List[T] = _list.synchronized {
+ java.util.Collections.unmodifiableList(new ArrayList[T](_list))
+ }
private[spark] def setValue(newValue: java.util.List[T]): Unit = {
_list.clear()