diff options
author | zsxwing <zsxwing@gmail.com> | 2014-09-25 18:24:01 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-25 18:24:01 -0700 |
commit | 86bce764983f2b14e1bd87fc3f4f938f7a217e1b (patch) | |
tree | 371aa5632998663eee624f449ce0fe08edbf7f35 /core | |
parent | 0dc868e787a3bc69c1b8e90d916a6dcea8dbcd6d (diff) | |
download | spark-86bce764983f2b14e1bd87fc3f4f938f7a217e1b.tar.gz spark-86bce764983f2b14e1bd87fc3f4f938f7a217e1b.tar.bz2 spark-86bce764983f2b14e1bd87fc3f4f938f7a217e1b.zip |
SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon.
Author: zsxwing <zsxwing@gmail.com>
Closes #1541 from zsxwing/SPARK-2634 and squashes the following commits:
d450053 [zsxwing] SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 51705c895a..f92189b707 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -18,10 +18,12 @@ package org.apache.spark import java.io._ +import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.{HashSet, HashMap, Map} import scala.concurrent.Await +import scala.collection.JavaConversions._ import akka.actor._ import akka.pattern.ask @@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * On the master, it serves as the source of map outputs recorded from ShuffleMapTasks. * On the workers, it simply serves as a cache, in which a miss triggers a fetch from the * master's corresponding HashMap. + * + * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a + * thread-safe map. */ protected val mapStatuses: Map[Int, Array[MapStatus]] @@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * MapOutputTrackerMaster. */ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { - protected val mapStatuses = new HashMap[Int, Array[MapStatus]] + protected val mapStatuses: Map[Int, Array[MapStatus]] = + new ConcurrentHashMap[Int, Array[MapStatus]] } private[spark] object MapOutputTracker { |