aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-09-14 13:33:51 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-14 13:33:51 -0700
commite33bfaed3b160fbc617c878067af17477a0044f5 (patch)
tree369215b3c286b9d9f2694ebec7f75bbcf7fdfba4 /sql/core
parentff6e4cbdc80e2ad84c5d70ee07f323fad9374e3e (diff)
downloadspark-e33bfaed3b160fbc617c878067af17477a0044f5.tar.gz
spark-e33bfaed3b160fbc617c878067af17477a0044f5.tar.bz2
spark-e33bfaed3b160fbc617c878067af17477a0044f5.zip
[SPARK-17463][CORE] Make CollectionAccumulator and SetAccumulator's value can be read thread-safely
## What changes were proposed in this pull request? Make CollectionAccumulator and SetAccumulator's value can be read thread-safely to fix the ConcurrentModificationException reported in [JIRA](https://issues.apache.org/jira/browse/SPARK-17463). ## How was this patch tested? Existing tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15063 from zsxwing/SPARK-17463.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala24
1 files changed, 15 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 082f97a880..d321f4cd76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.execution
-import scala.collection.mutable.HashSet
+import java.util.Collections
+
+import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -107,18 +109,20 @@ package object debug {
case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
def output: Seq[Attribute] = child.output
- class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] {
- private val _set = new HashSet[T]()
+ class SetAccumulator[T] extends AccumulatorV2[T, java.util.Set[T]] {
+ private val _set = Collections.synchronizedSet(new java.util.HashSet[T]())
override def isZero: Boolean = _set.isEmpty
- override def copy(): AccumulatorV2[T, HashSet[T]] = {
+ override def copy(): AccumulatorV2[T, java.util.Set[T]] = {
val newAcc = new SetAccumulator[T]()
- newAcc._set ++= _set
+ newAcc._set.addAll(_set)
newAcc
}
override def reset(): Unit = _set.clear()
- override def add(v: T): Unit = _set += v
- override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value
- override def value: HashSet[T] = _set
+ override def add(v: T): Unit = _set.add(v)
+ override def merge(other: AccumulatorV2[T, java.util.Set[T]]): Unit = {
+ _set.addAll(other.value)
+ }
+ override def value: java.util.Set[T] = _set
}
/**
@@ -138,7 +142,9 @@ package object debug {
debugPrint(s"== ${child.simpleString} ==")
debugPrint(s"Tuples output: ${tupleCount.value}")
child.output.zip(columnStats).foreach { case (attr, metric) =>
- val actualDataTypes = metric.elementTypes.value.mkString("{", ",", "}")
+ // This is called on driver. All accumulator updates have a fixed value. So it's safe to use
+ // `asScala` which accesses the internal values using `java.util.Iterator`.
+ val actualDataTypes = metric.elementTypes.value.asScala.mkString("{", ",", "}")
debugPrint(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
}
}