aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/MapOutputTracker.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala25
1 files changed, 5 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 8670f705cd..1b59beb8d6 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,7 +18,6 @@
package org.apache.spark
import java.io._
-import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
@@ -267,8 +266,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
/**
- * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map
- * output information, which allows old output information based on a TTL.
+ * MapOutputTracker for the driver.
*/
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {
@@ -291,17 +289,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
// can be read locally, but may lead to more delay in scheduling if those locations are busy.
private val REDUCER_PREF_LOCS_FRACTION = 0.2
- /**
- * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
- * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
- * Other than these two scenarios, nothing should be dropped from this HashMap.
- */
- protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
- private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
-
- // For cleaning up TimeStampedHashMaps
- private val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
+ // HashMaps for storing mapStatuses and cached serialized statuses in the driver.
+ // Statuses are dropped only by explicit de-registering.
+ protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
+ private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
def registerShuffle(shuffleId: Int, numMaps: Int) {
if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
@@ -462,14 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerEndpoint = null
- metadataCleaner.cancel()
cachedSerializedStatuses.clear()
}
-
- private def cleanup(cleanupTime: Long) {
- mapStatuses.clearOldValues(cleanupTime)
- cachedSerializedStatuses.clearOldValues(cleanupTime)
- }
}
/**