aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-10-11 00:42:46 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-10-11 00:42:46 -0700
commit2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62 (patch)
tree5c6d7ec756a651b449b1eadeb8b31b3b9f19ea9c /core/src/main
parent4001cbdec134183f93d4ed169f9141c186cfa7f9 (diff)
downloadspark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.tar.gz
spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.tar.bz2
spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.zip
Change block manager to accept a ArrayBuffer instead of an iterator to ensure
that the computation can proceed even if we run out of memory to cache the block. Update CacheTracker to use this new interface
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala22
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala12
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala16
5 files changed, 39 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index d9cbe3730a..c5db6ce63a 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// TODO: fetch any remote copy of the split that may be available
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
+ val elements = new ArrayBuffer[Any]
+ elements ++= rdd.compute(split)
try {
- // BlockManager will iterate over results from compute to create RDD
- blockManager.put(key, rdd.compute(split), storageLevel, true)
+ // Try to put this block in the blockManager
+ blockManager.put(key, elements, storageLevel, true)
//future.apply() // Wait for the reply from the cache tracker
- blockManager.get(key) match {
- case Some(values) =>
- return values.asInstanceOf[Iterator[T]]
- case None =>
- logWarning("loading partition failed after computing it " + key)
- return null
- }
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
+ return elements.iterator.asInstanceOf[Iterator[T]]
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 91b7bebfb3..8a111f44c9 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.getValues(blockId) match {
case Some(iterator) =>
// Put the block back in memory before returning it
- memoryStore.putValues(blockId, iterator, level, true).data match {
+ // TODO: Consider creating a putValues that also takes in a iterator ?
+ val elements = new ArrayBuffer[Any]
+ elements ++= iterator
+ memoryStore.putValues(blockId, elements, level, true).data match {
case Left(iterator2) =>
return Some(iterator2)
case _ =>
@@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
+ : Long = {
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ put(blockId, elements, level, tellMaster)
+ }
+
/**
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
- : Long = {
+ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ tellMaster: Boolean = true) : Long = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
@@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*/
- def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
@@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
- case Left(iterator) =>
- diskStore.putValues(blockId, iterator, level, false)
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 1286600cd1..096bf8bdd9 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,7 @@
package spark.storage
import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import spark.Logging
@@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
- : PutResult
+ def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ returnValues: Boolean) : PutResult
/**
* Return the size of a block in bytes.
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index fd92a3dc67..8ba64e4b76 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -3,11 +3,15 @@ package spark.storage
import java.nio.ByteBuffer
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel.MapMode
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import java.util.{Random, Date}
-import spark.Utils
import java.text.SimpleDateFormat
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Utils
+
/**
* Stores BlockManager blocks on disk.
*/
@@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
@@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val fileOut = blockManager.wrapForCompression(blockId,
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
- objOut.writeAll(values)
+ objOut.writeAll(values.iterator)
objOut.close()
val length = file.length()
logDebug("Block %s stored as %s file on disk in %d ms".format(
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index e9288fdf43..773970446a 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
if (level.deserialized) {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
- PutResult(sizeEstimate, Left(elements.iterator))
+ val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+ tryToPut(blockId, values, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.iterator))
} else {
- val bytes = blockManager.dataSerialize(blockId, values)
+ val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes))
}
@@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
for (blockId <- selectedBlocks) {
val entry = entries.get(blockId)
val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}