aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-10-29 18:01:59 -0700
committerJosh Rosen <joshrosen@apache.org>2013-10-29 18:06:51 -0700
commitcb9c8a922fc92f55e1de87ff13cbdbc3990c4ee3 (patch)
tree6b2dab2b4b94a61124dadc78be72e13466eedc92 /core
parent846b1cf5abcd1c6d085806a9092228561a90e13d (diff)
downloadspark-cb9c8a922fc92f55e1de87ff13cbdbc3990c4ee3.tar.gz
spark-cb9c8a922fc92f55e1de87ff13cbdbc3990c4ee3.tar.bz2
spark-cb9c8a922fc92f55e1de87ff13cbdbc3990c4ee3.zip
Extract BlockInfo classes from BlockManager.
This saves space, since the inner classes needed to keep a reference to the enclosing BlockManager.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockInfo.scala97
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala75
2 files changed, 97 insertions, 75 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
new file mode 100644
index 0000000000..dbe0bda615
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ConcurrentHashMap
+
+private[storage] trait BlockInfo {
+ def level: StorageLevel
+ def tellMaster: Boolean
+ // To save space, 'pending' and 'failed' are encoded as special sizes:
+ @volatile var size: Long = BlockInfo.BLOCK_PENDING
+ private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
+ private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
+ private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
+
+ setInitThread()
+
+ private def setInitThread() {
+ // Set current thread as init thread - waitForReady will not block this thread
+ // (in case there is non trivial initialization which ends up calling waitForReady as part of
+ // initialization itself)
+ BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
+ }
+
+ /**
+ * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
+ * Return true if the block is available, false otherwise.
+ */
+ def waitForReady(): Boolean = {
+ if (pending && initThread != Thread.currentThread()) {
+ synchronized {
+ while (pending) this.wait()
+ }
+ }
+ !failed
+ }
+
+ /** Mark this BlockInfo as ready (i.e. block is finished writing) */
+ def markReady(sizeInBytes: Long) {
+ require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
+ assert (pending)
+ size = sizeInBytes
+ BlockInfo.blockInfoInitThreads.remove(this)
+ synchronized {
+ this.notifyAll()
+ }
+ }
+
+ /** Mark this BlockInfo as ready but failed */
+ def markFailure() {
+ assert (pending)
+ size = BlockInfo.BLOCK_FAILED
+ BlockInfo.blockInfoInitThreads.remove(this)
+ synchronized {
+ this.notifyAll()
+ }
+ }
+}
+
+private object BlockInfo {
+ // initThread is logically a BlockInfo field, but we store it here because
+ // it's only needed while this block is in the 'pending' state and we want
+ // to minimize BlockInfo's memory footprint.
+ private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
+
+ private val BLOCK_PENDING: Long = -1L
+ private val BLOCK_FAILED: Long = -2L
+}
+
+// All shuffle blocks have the same `level` and `tellMaster` properties,
+// so we can save space by not storing them in each instance:
+private[storage] class ShuffleBlockInfo extends BlockInfo {
+ // These need to be defined using 'def' instead of 'val' in order for
+ // the compiler to eliminate the fields:
+ def level: StorageLevel = StorageLevel.DISK_ONLY
+ def tellMaster: Boolean = false
+}
+
+private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
+ extends BlockInfo {
+ // Intentionally left blank
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 11cda5de55..76d537f8e8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,7 +19,6 @@ package org.apache.spark.storage
import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.util.Random
@@ -47,80 +46,6 @@ private[spark] class BlockManager(
maxMemory: Long)
extends Logging {
-
- // initThread is logically a BlockInfo field, but we store it here because
- // it's only needed while this block is in the 'pending' state and we want
- // to minimize BlockInfo's memory footprint.
- private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
-
- private val BLOCK_PENDING: Long = -1L
- private val BLOCK_FAILED: Long = -2L
-
- private trait BlockInfo {
- def level: StorageLevel
- def tellMaster: Boolean
- @volatile var size: Long = BLOCK_PENDING // also encodes 'pending' and 'failed' to save space
- private def pending: Boolean = size == BLOCK_PENDING
- private def failed: Boolean = size == BLOCK_FAILED
- private def initThread: Thread = blockInfoInitThreads.get(this)
-
- setInitThread()
-
- private def setInitThread() {
- // Set current thread as init thread - waitForReady will not block this thread
- // (in case there is non trivial initialization which ends up calling waitForReady as part of
- // initialization itself)
- blockInfoInitThreads.put(this, Thread.currentThread())
- }
-
- /**
- * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
- * Return true if the block is available, false otherwise.
- */
- def waitForReady(): Boolean = {
- if (pending && initThread != Thread.currentThread()) {
- synchronized {
- while (pending) this.wait()
- }
- }
- !failed
- }
-
- /** Mark this BlockInfo as ready (i.e. block is finished writing) */
- def markReady(sizeInBytes: Long) {
- require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
- assert (pending)
- size = sizeInBytes
- blockInfoInitThreads.remove(this)
- synchronized {
- this.notifyAll()
- }
- }
-
- /** Mark this BlockInfo as ready but failed */
- def markFailure() {
- assert (pending)
- size = BLOCK_FAILED
- blockInfoInitThreads.remove(this)
- synchronized {
- this.notifyAll()
- }
- }
- }
-
- // All shuffle blocks have the same `level` and `tellMaster` properties,
- // so we can save space by not storing them in each instance:
- private class ShuffleBlockInfo extends BlockInfo {
- // These need to be defined using 'def' instead of 'val' in order for
- // the compiler to eliminate the fields:
- def level: StorageLevel = StorageLevel.DISK_ONLY
- def tellMaster: Boolean = false
- }
-
- private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo {
- // Intentionally left blank
- }
-
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))