diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/MapOutputTracker.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 25 |
1 files changed, 5 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 8670f705cd..1b59beb8d6 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -18,7 +18,6 @@ package org.apache.spark import java.io._ -import java.util.Arrays import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} @@ -267,8 +266,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } /** - * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map - * output information, which allows old output information based on a TTL. + * MapOutputTracker for the driver. */ private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { @@ -291,17 +289,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) // can be read locally, but may lead to more delay in scheduling if those locations are busy. private val REDUCER_PREF_LOCS_FRACTION = 0.2 - /** - * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver, - * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set). - * Other than these two scenarios, nothing should be dropped from this HashMap. - */ - protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() - private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]() - - // For cleaning up TimeStampedHashMaps - private val metadataCleaner = - new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf) + // HashMaps for storing mapStatuses and cached serialized statuses in the driver. + // Statuses are dropped only by explicit de-registering. + protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala + private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala def registerShuffle(shuffleId: Int, numMaps: Int) { if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) { @@ -462,14 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) sendTracker(StopMapOutputTracker) mapStatuses.clear() trackerEndpoint = null - metadataCleaner.cancel() cachedSerializedStatuses.clear() } - - private def cleanup(cleanupTime: Long) { - mapStatuses.clearOldValues(cleanupTime) - cachedSerializedStatuses.clearOldValues(cleanupTime) - } } /** |