diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-12-19 15:10:48 -0800 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-12-19 15:40:48 -0800 |
commit | 0647ec97573dc267c7a6b4679fb938b4dfa4fbb6 (patch) | |
tree | 82bd6f394d1bfc988bdd657c002316b018fdb448 | |
parent | 440e531a5e7720c42f0c53ce98425b63b4194b7b (diff) | |
download | spark-0647ec97573dc267c7a6b4679fb938b4dfa4fbb6.tar.gz spark-0647ec97573dc267c7a6b4679fb938b4dfa4fbb6.tar.bz2 spark-0647ec97573dc267c7a6b4679fb938b4dfa4fbb6.zip |
Clean up shuffle files once their metadata is gone
Previously, we would only clean the in-memory metadata for consolidated
shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-
specific TTLs.
3 files changed, 35 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index e828e1d1c5..212ef6506f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -70,10 +70,16 @@ class ShuffleBlockManager(blockManager: BlockManager) { * Contains all the state related to a particular shuffle. This includes a pool of unused * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. */ - private class ShuffleState() { + private class ShuffleState(val numBuckets: Int) { val nextFileId = new AtomicInteger(0) val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + + /** + * The mapIds of all map tasks completed on this Executor for this shuffle. + * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise. + */ + val completedMapTasks = new ConcurrentLinkedQueue[Int]() } type ShuffleId = Int @@ -84,7 +90,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) + shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null @@ -109,6 +115,8 @@ class ShuffleBlockManager(blockManager: BlockManager) { fileGroup.recordMapOutput(mapId, offsets) } recycleFileGroup(fileGroup) + } else { + shuffleState.completedMapTasks.add(mapId) } } @@ -154,7 +162,18 @@ class ShuffleBlockManager(blockManager: BlockManager) { } private def cleanup(cleanupTime: Long) { - shuffleStates.clearOldValues(cleanupTime) + shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => { + if (consolidateShuffleFiles) { + for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { + file.delete() + } + } else { + for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) { + val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) + blockManager.diskBlockManager.getFile(blockId).delete() + } + } + }) } } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 7b41ef89f1..fe56960cbf 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -27,7 +27,7 @@ import org.apache.spark.Logging class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging { val name = cleanerType.toString - private val delaySeconds = MetadataCleaner.getDelaySeconds + private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType) private val periodSeconds = math.max(10, delaySeconds / 10) private val timer = new Timer(name + " cleanup timer", true) diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index dbff571de9..181ae2fd45 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -104,19 +104,28 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging { def toMap: immutable.Map[A, B] = iterator.toMap /** - * Removes old key-value pairs that have timestamp earlier than `threshTime` + * Removes old key-value pairs that have timestamp earlier than `threshTime`, + * calling the supplied function on each such entry before removing. */ - def clearOldValues(threshTime: Long) { + def clearOldValues(threshTime: Long, f: (A, B) => Unit) { val iterator = internalMap.entrySet().iterator() - while(iterator.hasNext) { + while (iterator.hasNext) { val entry = iterator.next() if (entry.getValue._2 < threshTime) { + f(entry.getKey, entry.getValue._1) logDebug("Removing key " + entry.getKey) iterator.remove() } } } + /** + * Removes old key-value pairs that have timestamp earlier than `threshTime` + */ + def clearOldValues(threshTime: Long) { + clearOldValues(threshTime, (_, _) => ()) + } + private def currentTime: Long = System.currentTimeMillis() } |