aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authortgravescs <tgraves_cs@yahoo.com>2013-10-30 10:41:21 -0500
committertgravescs <tgraves_cs@yahoo.com>2013-10-30 10:41:21 -0500
commit54d9c6f2534e92335f6ce735a6c1054557621358 (patch)
tree91bd18af5c38d5feea9054dd5f2448a9a8780d0b /core
parente5e0ebdb1190a256e51dbf1265c6957f0cd56a29 (diff)
parent745dc4290834e8c38d61600e23e4706b0ae47179 (diff)
downloadspark-54d9c6f2534e92335f6ce735a6c1054557621358.tar.gz
spark-54d9c6f2534e92335f6ce735a6c1054557621358.tar.bz2
spark-54d9c6f2534e92335f6ce735a6c1054557621358.zip
Merge remote-tracking branch 'upstream/master' into sparkHadoopUtilFix
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfo.scala97
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala59
2 files changed, 99 insertions, 57 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
new file mode 100644
index 0000000000..dbe0bda615
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ConcurrentHashMap
+
+private[storage] trait BlockInfo {
+ def level: StorageLevel
+ def tellMaster: Boolean
+ // To save space, 'pending' and 'failed' are encoded as special sizes:
+ @volatile var size: Long = BlockInfo.BLOCK_PENDING
+ private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
+ private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
+ private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
+
+ setInitThread()
+
+ private def setInitThread() {
+ // Set current thread as init thread - waitForReady will not block this thread
+ // (in case there is non trivial initialization which ends up calling waitForReady as part of
+ // initialization itself)
+ BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
+ }
+
+ /**
+ * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
+ * Return true if the block is available, false otherwise.
+ */
+ def waitForReady(): Boolean = {
+ if (pending && initThread != Thread.currentThread()) {
+ synchronized {
+ while (pending) this.wait()
+ }
+ }
+ !failed
+ }
+
+ /** Mark this BlockInfo as ready (i.e. block is finished writing) */
+ def markReady(sizeInBytes: Long) {
+ require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
+ assert (pending)
+ size = sizeInBytes
+ BlockInfo.blockInfoInitThreads.remove(this)
+ synchronized {
+ this.notifyAll()
+ }
+ }
+
+ /** Mark this BlockInfo as ready but failed */
+ def markFailure() {
+ assert (pending)
+ size = BlockInfo.BLOCK_FAILED
+ BlockInfo.blockInfoInitThreads.remove(this)
+ synchronized {
+ this.notifyAll()
+ }
+ }
+}
+
+private object BlockInfo {
+ // initThread is logically a BlockInfo field, but we store it here because
+ // it's only needed while this block is in the 'pending' state and we want
+ // to minimize BlockInfo's memory footprint.
+ private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
+
+ private val BLOCK_PENDING: Long = -1L
+ private val BLOCK_FAILED: Long = -2L
+}
+
+// All shuffle blocks have the same `level` and `tellMaster` properties,
+// so we can save space by not storing them in each instance:
+private[storage] class ShuffleBlockInfo extends BlockInfo {
+ // These need to be defined using 'def' instead of 'val' in order for
+ // the compiler to eliminate the fields:
+ def level: StorageLevel = StorageLevel.DISK_ONLY
+ def tellMaster: Boolean = false
+}
+
+private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
+ extends BlockInfo {
+ // Intentionally left blank
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index dbe573dc64..76d537f8e8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -46,61 +46,6 @@ private[spark] class BlockManager(
maxMemory: Long)
extends Logging {
- private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
- @volatile var pending: Boolean = true
- @volatile var size: Long = -1L
- @volatile var initThread: Thread = null
- @volatile var failed = false
-
- setInitThread()
-
- private def setInitThread() {
- // Set current thread as init thread - waitForReady will not block this thread
- // (in case there is non trivial initialization which ends up calling waitForReady as part of
- // initialization itself)
- this.initThread = Thread.currentThread()
- }
-
- /**
- * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
- * Return true if the block is available, false otherwise.
- */
- def waitForReady(): Boolean = {
- if (initThread != Thread.currentThread() && pending) {
- synchronized {
- while (pending) this.wait()
- }
- }
- !failed
- }
-
- /** Mark this BlockInfo as ready (i.e. block is finished writing) */
- def markReady(sizeInBytes: Long) {
- assert (pending)
- size = sizeInBytes
- initThread = null
- failed = false
- initThread = null
- pending = false
- synchronized {
- this.notifyAll()
- }
- }
-
- /** Mark this BlockInfo as ready but failed */
- def markFailure() {
- assert (pending)
- size = 0
- initThread = null
- failed = true
- initThread = null
- pending = false
- synchronized {
- this.notifyAll()
- }
- }
- }
-
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
@@ -526,7 +471,7 @@ private[spark] class BlockManager(
if (shuffleBlockManager.consolidateShuffleFiles) {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
}
- val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
+ val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
})
@@ -560,7 +505,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
+ val tinfo = new BlockInfoImpl(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)