From 846b1cf5abcd1c6d085806a9092228561a90e13d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Oct 2013 21:30:29 -0700 Subject: Store fewer BlockInfo fields for shuffle blocks. --- .../org/apache/spark/storage/BlockManager.scala | 32 +++++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 285cf022f6..11cda5de55 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -53,10 +53,15 @@ private[spark] class BlockManager( // to minimize BlockInfo's memory footprint. private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] - private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { - @volatile var size: Long = -1L // also encodes 'pending' and 'failed' to save space - private def pending: Boolean = size == -1L - private def failed: Boolean = size == -2L + private val BLOCK_PENDING: Long = -1L + private val BLOCK_FAILED: Long = -2L + + private trait BlockInfo { + def level: StorageLevel + def tellMaster: Boolean + @volatile var size: Long = BLOCK_PENDING // also encodes 'pending' and 'failed' to save space + private def pending: Boolean = size == BLOCK_PENDING + private def failed: Boolean = size == BLOCK_FAILED private def initThread: Thread = blockInfoInitThreads.get(this) setInitThread() @@ -95,7 +100,7 @@ private[spark] class BlockManager( /** Mark this BlockInfo as ready but failed */ def markFailure() { assert (pending) - size = -2L + size = BLOCK_FAILED blockInfoInitThreads.remove(this) synchronized { this.notifyAll() @@ -103,6 +108,19 @@ private[spark] class BlockManager( } } + // All shuffle blocks have the same `level` and `tellMaster` properties, + // so we can save space by not storing them in each instance: + private class ShuffleBlockInfo extends BlockInfo { + // These need to be defined using 'def' instead of 'val' in order for + // the compiler to eliminate the fields: + def level: StorageLevel = StorageLevel.DISK_ONLY + def tellMaster: Boolean = false + } + + private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo { + // Intentionally left blank + } + val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager( System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) @@ -528,7 +546,7 @@ private[spark] class BlockManager( if (shuffleBlockManager.consolidateShuffleFiles) { diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) } - val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) + val myInfo = new ShuffleBlockInfo() blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) }) @@ -562,7 +580,7 @@ private[spark] class BlockManager( // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val myInfo = { - val tinfo = new BlockInfo(level, tellMaster) + val tinfo = new BlockInfoImpl(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) -- cgit v1.2.3