diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-31 20:02:12 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-31 20:02:12 -0800 |
commit | 3bc9e391a3eb1dd21bb93b15caf49627134c1917 (patch) | |
tree | 968430dd722d77cdbd715aa4d865aeec5e62ca5d | |
parent | 83dfa1666487a4772c95fea21fde0d47471e063d (diff) | |
parent | 08302b113a5db773e3b8d7cfea1ab1d2b8d3695b (diff) | |
download | spark-3bc9e391a3eb1dd21bb93b15caf49627134c1917.tar.gz spark-3bc9e391a3eb1dd21bb93b15caf49627134c1917.tar.bz2 spark-3bc9e391a3eb1dd21bb93b15caf49627134c1917.zip |
Merge branch 'master' of github.com:andrewor14/incubator-spark
4 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c5dacf3fd2..bcc3101485 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -68,9 +68,9 @@ private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends B def name = "input-" + streamId + "-" + uniqueId } -/** Block associated with intermediate (temporary) data managed as blocks. */ -private[spark] case class IntermediateBlockId(id: String) extends BlockId { - def name = "intermediate_" + id +/** Block associated with temporary data managed as blocks. */ +private[spark] case class TempBlockId(id: String) extends BlockId { + def name = "temp_" + id } // Intended only for testing purposes @@ -85,7 +85,7 @@ private[spark] object BlockId { val BROADCAST_HELPER = "broadcast_([0-9]+)_([A-Za-z0-9]+)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r - val INTERMEDIATE = "intermediate_(.*)".r + val TEMP = "temp_(.*)".r val TEST = "test_(.*)".r /** Converts a BlockId "name" String back into a BlockId. */ diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 58320f254a..32da458a6f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -91,10 +91,10 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD def getFile(blockId: BlockId): File = getFile(blockId.name) /** Produces a unique block id and File suitable for intermediate results. */ - def createIntermediateBlock: (IntermediateBlockId, File) = { - var blockId = new IntermediateBlockId(UUID.randomUUID().toString) + def createTempBlock(): (TempBlockId, File) = { + var blockId = new TempBlockId(UUID.randomUUID().toString) while (getFile(blockId).exists()) { - blockId = new IntermediateBlockId(UUID.randomUUID().toString) + blockId = new TempBlockId(UUID.randomUUID().toString) } (blockId, getFile(blockId)) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 311405f0cf..223fae128e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -141,7 +141,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( spillCount += 1 logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!") logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)") - val (blockId, file) = diskBlockManager.createIntermediateBlock + val (blockId, file) = diskBlockManager.createTempBlock() val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity) try { val it = currentMap.destructiveSortedIterator(comparator) diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala index e6b6103d96..204330dad4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala @@ -96,6 +96,6 @@ private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] } } -object SizeTrackingAppendOnlyMap { +private object SizeTrackingAppendOnlyMap { case class Sample(size: Long, numUpdates: Long) } |