aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-12-19 15:10:48 -0800
committerAaron Davidson <aaron@databricks.com>2013-12-19 15:40:48 -0800
commit0647ec97573dc267c7a6b4679fb938b4dfa4fbb6 (patch)
tree82bd6f394d1bfc988bdd657c002316b018fdb448 /core/src/main/scala/org/apache
parent440e531a5e7720c42f0c53ce98425b63b4194b7b (diff)
downloadspark-0647ec97573dc267c7a6b4679fb938b4dfa4fbb6.tar.gz
spark-0647ec97573dc267c7a6b4679fb938b4dfa4fbb6.tar.bz2
spark-0647ec97573dc267c7a6b4679fb938b4dfa4fbb6.zip
Clean up shuffle files once their metadata is gone
Previously, we would only clean the in-memory metadata for consolidated shuffle files. Additionally, fixes a bug where the Metadata Cleaner was ignoring type- specific TTLs.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala15
3 files changed, 35 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e828e1d1c5..212ef6506f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -70,10 +70,16 @@ class ShuffleBlockManager(blockManager: BlockManager) {
* Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/
- private class ShuffleState() {
+ private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+
+ /**
+ * The mapIds of all map tasks completed on this Executor for this shuffle.
+ * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
+ */
+ val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}
type ShuffleId = Int
@@ -84,7 +90,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
- shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
+ shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
@@ -109,6 +115,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
fileGroup.recordMapOutput(mapId, offsets)
}
recycleFileGroup(fileGroup)
+ } else {
+ shuffleState.completedMapTasks.add(mapId)
}
}
@@ -154,7 +162,18 @@ class ShuffleBlockManager(blockManager: BlockManager) {
}
private def cleanup(cleanupTime: Long) {
- shuffleStates.clearOldValues(cleanupTime)
+ shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => {
+ if (consolidateShuffleFiles) {
+ for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+ file.delete()
+ }
+ } else {
+ for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
+ val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+ blockManager.diskBlockManager.getFile(blockId).delete()
+ }
+ }
+ })
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 7b41ef89f1..fe56960cbf 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -27,7 +27,7 @@ import org.apache.spark.Logging
class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
val name = cleanerType.toString
- private val delaySeconds = MetadataCleaner.getDelaySeconds
+ private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index dbff571de9..181ae2fd45 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -104,19 +104,28 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
def toMap: immutable.Map[A, B] = iterator.toMap
/**
- * Removes old key-value pairs that have timestamp earlier than `threshTime`
+ * Removes old key-value pairs that have timestamp earlier than `threshTime`,
+ * calling the supplied function on each such entry before removing.
*/
- def clearOldValues(threshTime: Long) {
+ def clearOldValues(threshTime: Long, f: (A, B) => Unit) {
val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
+ while (iterator.hasNext) {
val entry = iterator.next()
if (entry.getValue._2 < threshTime) {
+ f(entry.getKey, entry.getValue._1)
logDebug("Removing key " + entry.getKey)
iterator.remove()
}
}
}
+ /**
+ * Removes old key-value pairs that have timestamp earlier than `threshTime`
+ */
+ def clearOldValues(threshTime: Long) {
+ clearOldValues(threshTime, (_, _) => ())
+ }
+
private def currentTime: Long = System.currentTimeMillis()
}