aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-10-27 21:30:29 -0700
committerJosh Rosen <joshrosen@apache.org>2013-10-29 15:14:29 -0700
commit846b1cf5abcd1c6d085806a9092228561a90e13d (patch)
tree858a8e59f224186ec29add7407ad04c6d9e3fb3a /core
parent2d7cf6a271dbd494f1d351e6db7db8568733edc3 (diff)
downloadspark-846b1cf5abcd1c6d085806a9092228561a90e13d.tar.gz
spark-846b1cf5abcd1c6d085806a9092228561a90e13d.tar.bz2
spark-846b1cf5abcd1c6d085806a9092228561a90e13d.zip
Store fewer BlockInfo fields for shuffle blocks.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala32
1 files changed, 25 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 285cf022f6..11cda5de55 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -53,10 +53,15 @@ private[spark] class BlockManager(
// to minimize BlockInfo's memory footprint.
private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
- private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
- @volatile var size: Long = -1L // also encodes 'pending' and 'failed' to save space
- private def pending: Boolean = size == -1L
- private def failed: Boolean = size == -2L
+ private val BLOCK_PENDING: Long = -1L
+ private val BLOCK_FAILED: Long = -2L
+
+ private trait BlockInfo {
+ def level: StorageLevel
+ def tellMaster: Boolean
+ @volatile var size: Long = BLOCK_PENDING // also encodes 'pending' and 'failed' to save space
+ private def pending: Boolean = size == BLOCK_PENDING
+ private def failed: Boolean = size == BLOCK_FAILED
private def initThread: Thread = blockInfoInitThreads.get(this)
setInitThread()
@@ -95,7 +100,7 @@ private[spark] class BlockManager(
/** Mark this BlockInfo as ready but failed */
def markFailure() {
assert (pending)
- size = -2L
+ size = BLOCK_FAILED
blockInfoInitThreads.remove(this)
synchronized {
this.notifyAll()
@@ -103,6 +108,19 @@ private[spark] class BlockManager(
}
}
+ // All shuffle blocks have the same `level` and `tellMaster` properties,
+ // so we can save space by not storing them in each instance:
+ private class ShuffleBlockInfo extends BlockInfo {
+ // These need to be defined using 'def' instead of 'val' in order for
+ // the compiler to eliminate the fields:
+ def level: StorageLevel = StorageLevel.DISK_ONLY
+ def tellMaster: Boolean = false
+ }
+
+ private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo {
+ // Intentionally left blank
+ }
+
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
@@ -528,7 +546,7 @@ private[spark] class BlockManager(
if (shuffleBlockManager.consolidateShuffleFiles) {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
}
- val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
+ val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
})
@@ -562,7 +580,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
+ val tinfo = new BlockInfoImpl(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)