From 2d7cf6a271dbd494f1d351e6db7db8568733edc3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Oct 2013 12:51:54 -0700 Subject: Restructure BlockInfo fields to reduce memory use. --- .../org/apache/spark/storage/BlockManager.scala | 32 ++++++++++++---------- 1 file changed, 17 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dbe573dc64..285cf022f6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.{HashMap, ArrayBuffer} import scala.util.Random @@ -46,11 +47,17 @@ private[spark] class BlockManager( maxMemory: Long) extends Logging { + + // initThread is logically a BlockInfo field, but we store it here because + // it's only needed while this block is in the 'pending' state and we want + // to minimize BlockInfo's memory footprint. + private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] + private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { - @volatile var pending: Boolean = true - @volatile var size: Long = -1L - @volatile var initThread: Thread = null - @volatile var failed = false + @volatile var size: Long = -1L // also encodes 'pending' and 'failed' to save space + private def pending: Boolean = size == -1L + private def failed: Boolean = size == -2L + private def initThread: Thread = blockInfoInitThreads.get(this) setInitThread() @@ -58,7 +65,7 @@ private[spark] class BlockManager( // Set current thread as init thread - waitForReady will not block this thread // (in case there is non trivial initialization which ends up calling waitForReady as part of // initialization itself) - this.initThread = Thread.currentThread() + blockInfoInitThreads.put(this, Thread.currentThread()) } /** @@ -66,7 +73,7 @@ private[spark] class BlockManager( * Return true if the block is available, false otherwise. */ def waitForReady(): Boolean = { - if (initThread != Thread.currentThread() && pending) { + if (pending && initThread != Thread.currentThread()) { synchronized { while (pending) this.wait() } @@ -76,12 +83,10 @@ private[spark] class BlockManager( /** Mark this BlockInfo as ready (i.e. block is finished writing) */ def markReady(sizeInBytes: Long) { + require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes) assert (pending) size = sizeInBytes - initThread = null - failed = false - initThread = null - pending = false + blockInfoInitThreads.remove(this) synchronized { this.notifyAll() } @@ -90,11 +95,8 @@ private[spark] class BlockManager( /** Mark this BlockInfo as ready but failed */ def markFailure() { assert (pending) - size = 0 - initThread = null - failed = true - initThread = null - pending = false + size = -2L + blockInfoInitThreads.remove(this) synchronized { this.notifyAll() } -- cgit v1.2.3 From 846b1cf5abcd1c6d085806a9092228561a90e13d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Oct 2013 21:30:29 -0700 Subject: Store fewer BlockInfo fields for shuffle blocks. --- .../org/apache/spark/storage/BlockManager.scala | 32 +++++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 285cf022f6..11cda5de55 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -53,10 +53,15 @@ private[spark] class BlockManager( // to minimize BlockInfo's memory footprint. private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] - private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { - @volatile var size: Long = -1L // also encodes 'pending' and 'failed' to save space - private def pending: Boolean = size == -1L - private def failed: Boolean = size == -2L + private val BLOCK_PENDING: Long = -1L + private val BLOCK_FAILED: Long = -2L + + private trait BlockInfo { + def level: StorageLevel + def tellMaster: Boolean + @volatile var size: Long = BLOCK_PENDING // also encodes 'pending' and 'failed' to save space + private def pending: Boolean = size == BLOCK_PENDING + private def failed: Boolean = size == BLOCK_FAILED private def initThread: Thread = blockInfoInitThreads.get(this) setInitThread() @@ -95,7 +100,7 @@ private[spark] class BlockManager( /** Mark this BlockInfo as ready but failed */ def markFailure() { assert (pending) - size = -2L + size = BLOCK_FAILED blockInfoInitThreads.remove(this) synchronized { this.notifyAll() @@ -103,6 +108,19 @@ private[spark] class BlockManager( } } + // All shuffle blocks have the same `level` and `tellMaster` properties, + // so we can save space by not storing them in each instance: + private class ShuffleBlockInfo extends BlockInfo { + // These need to be defined using 'def' instead of 'val' in order for + // the compiler to eliminate the fields: + def level: StorageLevel = StorageLevel.DISK_ONLY + def tellMaster: Boolean = false + } + + private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo { + // Intentionally left blank + } + val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager( System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) @@ -528,7 +546,7 @@ private[spark] class BlockManager( if (shuffleBlockManager.consolidateShuffleFiles) { diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) } - val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) + val myInfo = new ShuffleBlockInfo() blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) }) @@ -562,7 +580,7 @@ private[spark] class BlockManager( // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val myInfo = { - val tinfo = new BlockInfo(level, tellMaster) + val tinfo = new BlockInfoImpl(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) -- cgit v1.2.3 From cb9c8a922fc92f55e1de87ff13cbdbc3990c4ee3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 29 Oct 2013 18:01:59 -0700 Subject: Extract BlockInfo classes from BlockManager. This saves space, since the inner classes needed to keep a reference to the enclosing BlockManager. --- .../scala/org/apache/spark/storage/BlockInfo.scala | 97 ++++++++++++++++++++++ .../org/apache/spark/storage/BlockManager.scala | 75 ----------------- 2 files changed, 97 insertions(+), 75 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/BlockInfo.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala new file mode 100644 index 0000000000..dbe0bda615 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.ConcurrentHashMap + +private[storage] trait BlockInfo { + def level: StorageLevel + def tellMaster: Boolean + // To save space, 'pending' and 'failed' are encoded as special sizes: + @volatile var size: Long = BlockInfo.BLOCK_PENDING + private def pending: Boolean = size == BlockInfo.BLOCK_PENDING + private def failed: Boolean = size == BlockInfo.BLOCK_FAILED + private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this) + + setInitThread() + + private def setInitThread() { + // Set current thread as init thread - waitForReady will not block this thread + // (in case there is non trivial initialization which ends up calling waitForReady as part of + // initialization itself) + BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread()) + } + + /** + * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). + * Return true if the block is available, false otherwise. + */ + def waitForReady(): Boolean = { + if (pending && initThread != Thread.currentThread()) { + synchronized { + while (pending) this.wait() + } + } + !failed + } + + /** Mark this BlockInfo as ready (i.e. block is finished writing) */ + def markReady(sizeInBytes: Long) { + require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes) + assert (pending) + size = sizeInBytes + BlockInfo.blockInfoInitThreads.remove(this) + synchronized { + this.notifyAll() + } + } + + /** Mark this BlockInfo as ready but failed */ + def markFailure() { + assert (pending) + size = BlockInfo.BLOCK_FAILED + BlockInfo.blockInfoInitThreads.remove(this) + synchronized { + this.notifyAll() + } + } +} + +private object BlockInfo { + // initThread is logically a BlockInfo field, but we store it here because + // it's only needed while this block is in the 'pending' state and we want + // to minimize BlockInfo's memory footprint. + private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] + + private val BLOCK_PENDING: Long = -1L + private val BLOCK_FAILED: Long = -2L +} + +// All shuffle blocks have the same `level` and `tellMaster` properties, +// so we can save space by not storing them in each instance: +private[storage] class ShuffleBlockInfo extends BlockInfo { + // These need to be defined using 'def' instead of 'val' in order for + // the compiler to eliminate the fields: + def level: StorageLevel = StorageLevel.DISK_ONLY + def tellMaster: Boolean = false +} + +private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) + extends BlockInfo { + // Intentionally left blank +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 11cda5de55..76d537f8e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,7 +19,6 @@ package org.apache.spark.storage import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.{HashMap, ArrayBuffer} import scala.util.Random @@ -47,80 +46,6 @@ private[spark] class BlockManager( maxMemory: Long) extends Logging { - - // initThread is logically a BlockInfo field, but we store it here because - // it's only needed while this block is in the 'pending' state and we want - // to minimize BlockInfo's memory footprint. - private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] - - private val BLOCK_PENDING: Long = -1L - private val BLOCK_FAILED: Long = -2L - - private trait BlockInfo { - def level: StorageLevel - def tellMaster: Boolean - @volatile var size: Long = BLOCK_PENDING // also encodes 'pending' and 'failed' to save space - private def pending: Boolean = size == BLOCK_PENDING - private def failed: Boolean = size == BLOCK_FAILED - private def initThread: Thread = blockInfoInitThreads.get(this) - - setInitThread() - - private def setInitThread() { - // Set current thread as init thread - waitForReady will not block this thread - // (in case there is non trivial initialization which ends up calling waitForReady as part of - // initialization itself) - blockInfoInitThreads.put(this, Thread.currentThread()) - } - - /** - * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). - * Return true if the block is available, false otherwise. - */ - def waitForReady(): Boolean = { - if (pending && initThread != Thread.currentThread()) { - synchronized { - while (pending) this.wait() - } - } - !failed - } - - /** Mark this BlockInfo as ready (i.e. block is finished writing) */ - def markReady(sizeInBytes: Long) { - require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes) - assert (pending) - size = sizeInBytes - blockInfoInitThreads.remove(this) - synchronized { - this.notifyAll() - } - } - - /** Mark this BlockInfo as ready but failed */ - def markFailure() { - assert (pending) - size = BLOCK_FAILED - blockInfoInitThreads.remove(this) - synchronized { - this.notifyAll() - } - } - } - - // All shuffle blocks have the same `level` and `tellMaster` properties, - // so we can save space by not storing them in each instance: - private class ShuffleBlockInfo extends BlockInfo { - // These need to be defined using 'def' instead of 'val' in order for - // the compiler to eliminate the fields: - def level: StorageLevel = StorageLevel.DISK_ONLY - def tellMaster: Boolean = false - } - - private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo { - // Intentionally left blank - } - val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager( System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) -- cgit v1.2.3