aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-31 20:02:12 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-31 20:02:12 -0800
commit3bc9e391a3eb1dd21bb93b15caf49627134c1917 (patch)
tree968430dd722d77cdbd715aa4d865aeec5e62ca5d /core/src/main/scala/org
parent83dfa1666487a4772c95fea21fde0d47471e063d (diff)
parent08302b113a5db773e3b8d7cfea1ab1d2b8d3695b (diff)
downloadspark-3bc9e391a3eb1dd21bb93b15caf49627134c1917.tar.gz
spark-3bc9e391a3eb1dd21bb93b15caf49627134c1917.tar.bz2
spark-3bc9e391a3eb1dd21bb93b15caf49627134c1917.zip
Merge branch 'master' of github.com:andrewor14/incubator-spark
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala2
4 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c5dacf3fd2..bcc3101485 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -68,9 +68,9 @@ private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends B
def name = "input-" + streamId + "-" + uniqueId
}
-/** Block associated with intermediate (temporary) data managed as blocks. */
-private[spark] case class IntermediateBlockId(id: String) extends BlockId {
- def name = "intermediate_" + id
+/** Block associated with temporary data managed as blocks. */
+private[spark] case class TempBlockId(id: String) extends BlockId {
+ def name = "temp_" + id
}
// Intended only for testing purposes
@@ -85,7 +85,7 @@ private[spark] object BlockId {
val BROADCAST_HELPER = "broadcast_([0-9]+)_([A-Za-z0-9]+)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
- val INTERMEDIATE = "intermediate_(.*)".r
+ val TEMP = "temp_(.*)".r
val TEST = "test_(.*)".r
/** Converts a BlockId "name" String back into a BlockId. */
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 58320f254a..32da458a6f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -91,10 +91,10 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
def getFile(blockId: BlockId): File = getFile(blockId.name)
/** Produces a unique block id and File suitable for intermediate results. */
- def createIntermediateBlock: (IntermediateBlockId, File) = {
- var blockId = new IntermediateBlockId(UUID.randomUUID().toString)
+ def createTempBlock(): (TempBlockId, File) = {
+ var blockId = new TempBlockId(UUID.randomUUID().toString)
while (getFile(blockId).exists()) {
- blockId = new IntermediateBlockId(UUID.randomUUID().toString)
+ blockId = new TempBlockId(UUID.randomUUID().toString)
}
(blockId, getFile(blockId))
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 311405f0cf..223fae128e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -141,7 +141,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
spillCount += 1
logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!")
logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)")
- val (blockId, file) = diskBlockManager.createIntermediateBlock
+ val (blockId, file) = diskBlockManager.createTempBlock()
val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity)
try {
val it = currentMap.destructiveSortedIterator(comparator)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index e6b6103d96..204330dad4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -96,6 +96,6 @@ private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V]
}
}
-object SizeTrackingAppendOnlyMap {
+private object SizeTrackingAppendOnlyMap {
case class Sample(size: Long, numUpdates: Long)
}