aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-06 18:00:53 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-06 18:00:53 -0700
commit0e42832e6ae8d1e343d9b153af0a787fe8507602 (patch)
tree5fb324400e33dc65dd6b42f9776b4e51403f51d1 /core
parentb0110de5b6061d28db9a84ff142b314beec7ff84 (diff)
downloadspark-0e42832e6ae8d1e343d9b153af0a787fe8507602.tar.gz
spark-0e42832e6ae8d1e343d9b153af0a787fe8507602.tar.bz2
spark-0e42832e6ae8d1e343d9b153af0a787fe8507602.zip
Made block store return the size of each block put in
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala3
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala67
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala7
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala7
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala13
-rw-r--r--core/src/main/scala/spark/storage/PutResult.scala9
6 files changed, 70 insertions, 36 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 387aac3c1f..9ca2c9e449 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -141,11 +141,10 @@ private[spark] class ShuffleMapTask(
buckets.map(_.iterator)
}
- val ser = SparkEnv.get.serializer.newInstance()
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
- // Get a scala iterator from java map
+ // Get a Scala iterator from Java map
val iter: Iterator[(Any, Any)] = bucketIterators(i)
blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 20ebfff68d..31debdd0fb 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -61,7 +61,11 @@ private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
- class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) {
+ class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+ var pending: Boolean = true
+ var size: Long = -1L
+
+ /** Wait for this BlockInfo to be marked as ready (i.e. block is finished writing) */
def waitForReady() {
if (pending) {
synchronized {
@@ -70,8 +74,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- def markReady() {
+ /** Mark this BlockInfo as ready (i.e. block is finished writing) */
+ def markReady(sizeInBytes: Long) {
pending = false
+ size = sizeInBytes
synchronized {
this.notifyAll()
}
@@ -209,7 +215,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.getValues(blockId) match {
case Some(iterator) =>
// Put the block back in memory before returning it
- memoryStore.putValues(blockId, iterator, level, true) match {
+ memoryStore.putValues(blockId, iterator, level, true).data match {
case Left(iterator2) =>
return Some(iterator2)
case _ =>
@@ -453,9 +459,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Put a new block of values to the block manager.
+ * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) {
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
+ : Long = {
+
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
@@ -466,9 +474,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
+ val oldBlock = blockInfo.get(blockId)
+ if (oldBlock != null) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return
+ oldBlock.waitForReady()
+ return oldBlock.size
}
// Remember the block's storage level so that we can correctly drop it to disk if it needs
@@ -478,14 +488,19 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
blockInfo.put(blockId, myInfo)
val startTimeMs = System.currentTimeMillis
- var bytes: ByteBuffer = null
// If we need to replicate the data, we'll want access to the values, but because our
// put will read the whole iterator, there will be no values left. For the case where
- // the put serializes data, we'll remember the bytes, above; but for the case where
- // it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator.
+ // the put serializes data, we'll remember the bytes, above; but for the case where it
+ // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
var valuesAfterPut: Iterator[Any] = null
+ // Ditto for the bytes after the put
+ var bytesAfterPut: ByteBuffer = null
+
+ // Size of the block in bytes (to return to caller)
+ var size = 0L
+
locker.getLock(blockId).synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@@ -493,22 +508,26 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useMemory) {
// Save it just to memory first, even if it also has useDisk set to true; we will later
// drop it to disk if the memory store can't hold it.
- memoryStore.putValues(blockId, values, level, true) match {
- case Right(newBytes) => bytes = newBytes
+ val res = memoryStore.putValues(blockId, values, level, true)
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
// Save directly to disk.
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
- diskStore.putValues(blockId, values, level, askForBytes) match {
- case Right(newBytes) => bytes = newBytes
+ val res = diskStore.putValues(blockId, values, level, askForBytes)
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
- myInfo.markReady()
+ myInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId)
}
@@ -518,23 +537,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Replicate block if required
if (level.replication > 1) {
// Serialize the block if not already done
- if (bytes == null) {
+ if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
- bytes = dataSerialize(valuesAfterPut)
+ bytesAfterPut = dataSerialize(valuesAfterPut)
}
- replicate(blockId, bytes, level)
+ replicate(blockId, bytesAfterPut, level)
}
- BlockManager.dispose(bytes)
+ BlockManager.dispose(bytesAfterPut)
// TODO: This code will be removed when CacheTracker is gone.
if (blockId.startsWith("rdd")) {
- notifyTheCacheTracker(blockId)
+ notifyCacheTracker(blockId)
}
logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs))
+
+ return size
}
@@ -593,7 +614,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
- myInfo.markReady()
+ myInfo.markReady(bytes.limit)
if (tellMaster) {
reportBlockStatus(blockId)
}
@@ -601,7 +622,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This code will be removed when CacheTracker is gone.
if (blockId.startsWith("rdd")) {
- notifyTheCacheTracker(blockId)
+ notifyCacheTracker(blockId)
}
// If replication had started, then wait for it to finish
@@ -647,7 +668,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
// TODO: This code will be removed when CacheTracker is gone.
- private def notifyTheCacheTracker(key: String) {
+ private def notifyCacheTracker(key: String) {
if (cacheTracker != null) {
val rddInfo = key.split("_")
val rddId: Int = rddInfo(1).toInt
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index ff482ff66b..1286600cd1 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -15,13 +15,14 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
* This is used to efficiently write the values to multiple locations (e.g. for replication).
*
- * @return the values put if returnValues is true, or null otherwise
+ * @return a PutResult that contains the size of the data, as well as the values put if
+ * returnValues is true (if not, the result's data field can be null)
*/
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
- : Either[Iterator[Any], ByteBuffer]
+ : PutResult
/**
- * Return the size of a block.
+ * Return the size of a block in bytes.
*/
def getSize(blockId: String): Long
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index d0c592ccb1..26f4ddfb49 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -48,7 +48,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
- : Either[Iterator[Any], ByteBuffer] = {
+ : PutResult = {
logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
@@ -65,9 +65,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
if (returnValues) {
// Return a byte buffer for the contents of the file
val channel = new RandomAccessFile(file, "r").getChannel()
- val buffer = channel.map(MapMode.READ_ONLY, 0, channel.size())
+ val length = channel.size()
+ val buffer = channel.map(MapMode.READ_ONLY, 0, length)
channel.close()
- Right(buffer)
+ PutResult(length, Right(buffer))
} else {
null
}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 74ef326038..24e1d75013 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -49,18 +49,18 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
- : Either[Iterator[Any], ByteBuffer] = {
+ : PutResult = {
if (level.deserialized) {
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
- Left(elements.iterator)
+ PutResult(sizeEstimate, Left(elements.iterator))
} else {
val bytes = blockManager.dataSerialize(values)
tryToPut(blockId, bytes, bytes.limit, false)
- Right(bytes)
+ PutResult(bytes.limit(), Right(bytes))
}
}
@@ -162,7 +162,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assumes that a lock on entries is held by the caller.
+ * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space
+ * might fill up before the caller puts in their new value.)
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
@@ -172,7 +173,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
return false
}
-
+
+ // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
+ // in order to allow parallelism in writing to disk
if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
diff --git a/core/src/main/scala/spark/storage/PutResult.scala b/core/src/main/scala/spark/storage/PutResult.scala
new file mode 100644
index 0000000000..76f236057b
--- /dev/null
+++ b/core/src/main/scala/spark/storage/PutResult.scala
@@ -0,0 +1,9 @@
+package spark.storage
+
+import java.nio.ByteBuffer
+
+/**
+ * Result of adding a block into a BlockStore. Contains its estimated size, and possibly the
+ * values put if the caller asked for them to be returned (e.g. for chaining replication)
+ */
+private[spark] case class PutResult(size: Long, data: Either[Iterator[_], ByteBuffer])