aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorErgin Seyfe <eseyfe@fb.com>2016-10-10 20:41:31 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-10 20:41:31 -0700
commit19a5bae47f69929d00d9de43387c7df37a05ee25 (patch)
tree8456d4d4a29197da1746adec613abbd3a1fda296 /core
parent90217f9deed01ae187e28ef1531491aac8ee50c9 (diff)
downloadspark-19a5bae47f69929d00d9de43387c7df37a05ee25.tar.gz
spark-19a5bae47f69929d00d9de43387c7df37a05ee25.tar.bz2
spark-19a5bae47f69929d00d9de43387c7df37a05ee25.zip
[SPARK-17816][CORE] Fix ConcurrentModificationException issue in BlockStatusesAccumulator
## What changes were proposed in this pull request? Change the BlockStatusesAccumulator to return immutable object when value method is called. ## How was this patch tested? Existing tests plus I verified this change by running a pipeline which consistently repro this issue. This is the stack trace for this exception: ` java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableLike$class.to(TraversableLike.scala:590) at scala.collection.AbstractTraversable.to(Traversable.scala:104) at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294) at scala.collection.AbstractTraversable.toList(Traversable.scala:104) at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314) at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) at scala.Option.map(Option.scala:146) at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291) at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283) at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76) ` Author: Ergin Seyfe <eseyfe@fb.com> Closes #15371 from seyfe/race_cond_jsonprotocal.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
3 files changed, 6 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 2956768c16..dfd2f818ac 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,8 +17,6 @@
package org.apache.spark.executor
-import java.util.{ArrayList, Collections}
-
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
@@ -27,7 +25,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
-import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator}
+import org.apache.spark.util._
/**
@@ -56,7 +54,7 @@ class TaskMetrics private[spark] () extends Serializable {
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
- private val _updatedBlockStatuses = new BlockStatusesAccumulator
+ private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
/**
* Time taken on the executor to deserialize this task.
@@ -323,39 +321,3 @@ private[spark] object TaskMetrics extends Logging {
tm
}
}
-
-
-private[spark] class BlockStatusesAccumulator
- extends AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]] {
- private val _seq = Collections.synchronizedList(new ArrayList[(BlockId, BlockStatus)]())
-
- override def isZero(): Boolean = _seq.isEmpty
-
- override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
-
- override def copy(): BlockStatusesAccumulator = {
- val newAcc = new BlockStatusesAccumulator
- newAcc._seq.addAll(_seq)
- newAcc
- }
-
- override def reset(): Unit = _seq.clear()
-
- override def add(v: (BlockId, BlockStatus)): Unit = _seq.add(v)
-
- override def merge(
- other: AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]]): Unit = {
- other match {
- case o: BlockStatusesAccumulator => _seq.addAll(o.value)
- case _ => throw new UnsupportedOperationException(
- s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
- }
- }
-
- override def value: java.util.List[(BlockId, BlockStatus)] = _seq
-
- def setValue(newValue: java.util.List[(BlockId, BlockStatus)]): Unit = {
- _seq.clear()
- _seq.addAll(newValue)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 470d912ecf..d3ddd39131 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -444,7 +444,9 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def copy(): CollectionAccumulator[T] = {
val newAcc = new CollectionAccumulator[T]
- newAcc._list.addAll(_list)
+ _list.synchronized {
+ newAcc._list.addAll(_list)
+ }
newAcc
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f4fa7b4061..c11eb3ffa4 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -281,7 +281,7 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
- ("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
+ ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
}
def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {