diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-09-14 13:33:51 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-09-14 13:33:51 -0700 |
commit | e33bfaed3b160fbc617c878067af17477a0044f5 (patch) | |
tree | 369215b3c286b9d9f2694ebec7f75bbcf7fdfba4 /sql/core/src | |
parent | ff6e4cbdc80e2ad84c5d70ee07f323fad9374e3e (diff) | |
download | spark-e33bfaed3b160fbc617c878067af17477a0044f5.tar.gz spark-e33bfaed3b160fbc617c878067af17477a0044f5.tar.bz2 spark-e33bfaed3b160fbc617c878067af17477a0044f5.zip |
[SPARK-17463][CORE] Make CollectionAccumulator and SetAccumulator's value can be read thread-safely
## What changes were proposed in this pull request?
Make CollectionAccumulator and SetAccumulator's value can be read thread-safely to fix the ConcurrentModificationException reported in [JIRA](https://issues.apache.org/jira/browse/SPARK-17463).
## How was this patch tested?
Existing tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #15063 from zsxwing/SPARK-17463.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 082f97a880..d321f4cd76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution -import scala.collection.mutable.HashSet +import java.util.Collections + +import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -107,18 +109,20 @@ package object debug { case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { def output: Seq[Attribute] = child.output - class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] { - private val _set = new HashSet[T]() + class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] { + private val _set = Collections.synchronizedSet(new java.util.HashSet[T]()) override def isZero: Boolean = _set.isEmpty - override def copy(): AccumulatorV2[T, HashSet[T]] = { + override def copy(): AccumulatorV2[T, java.util.Set[T]] = { val newAcc = new SetAccumulator[T]() - newAcc._set ++= _set + newAcc._set.addAll(_set) newAcc } override def reset(): Unit = _set.clear() - override def add(v: T): Unit = _set += v - override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value - override def value: HashSet[T] = _set + override def add(v: T): Unit = _set.add(v) + override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = { + _set.addAll(other.value) + } + override def value: java.util.Set[T] = _set } /** @@ -138,7 +142,9 @@ package object debug { debugPrint(s"== ${child.simpleString} ==") debugPrint(s"Tuples output: ${tupleCount.value}") child.output.zip(columnStats).foreach { case (attr, metric) => - val actualDataTypes = metric.elementTypes.value.mkString("{", ",", "}") + // This is called on driver. All accumulator updates have a fixed value. So it's safe to use + // `asScala` which accesses the internal values using `java.util.Iterator`. + val actualDataTypes = metric.elementTypes.value.asScala.mkString("{", ",", "}") debugPrint(s" ${attr.name} ${attr.dataType}: $actualDataTypes") } } |