aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-09-25 18:24:01 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-25 18:24:01 -0700
commit86bce764983f2b14e1bd87fc3f4f938f7a217e1b (patch)
tree371aa5632998663eee624f449ce0fe08edbf7f35
parent0dc868e787a3bc69c1b8e90d916a6dcea8dbcd6d (diff)
downloadspark-86bce764983f2b14e1bd87fc3f4f938f7a217e1b.tar.gz
spark-86bce764983f2b14e1bd87fc3f4f938f7a217e1b.tar.bz2
spark-86bce764983f2b14e1bd87fc3f4f938f7a217e1b.zip
SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon. Author: zsxwing <zsxwing@gmail.com> Closes #1541 from zsxwing/SPARK-2634 and squashes the following commits: d450053 [zsxwing] SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala8
1 files changed, 7 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 51705c895a..f92189b707 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,10 +18,12 @@
package org.apache.spark
import java.io._
+import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
+import scala.collection.JavaConversions._
import akka.actor._
import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
+ *
+ * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
+ * thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]
@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
- protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ protected val mapStatuses: Map[Int, Array[MapStatus]] =
+ new ConcurrentHashMap[Int, Array[MapStatus]]
}
private[spark] object MapOutputTracker {