aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala8
1 files changed, 7 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 51705c895a..f92189b707 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,10 +18,12 @@
package org.apache.spark
import java.io._
+import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
+import scala.collection.JavaConversions._
import akka.actor._
import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
+ *
+ * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
+ * thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]
@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
- protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ protected val mapStatuses: Map[Int, Array[MapStatus]] =
+ new ConcurrentHashMap[Int, Array[MapStatus]]
}
private[spark] object MapOutputTracker {