aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-10-11 00:42:46 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-10-11 00:42:46 -0700
commit2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62 (patch)
tree5c6d7ec756a651b449b1eadeb8b31b3b9f19ea9c /core
parent4001cbdec134183f93d4ed169f9141c186cfa7f9 (diff)
downloadspark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.tar.gz
spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.tar.bz2
spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.zip
Change block manager to accept a ArrayBuffer instead of an iterator to ensure
that the computation can proceed even if we run out of memory to cache the block. Update CacheTracker to use this new interface
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala22
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala12
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala16
-rw-r--r--core/src/test/scala/spark/CacheTrackerSuite.scala2
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala16
7 files changed, 48 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index d9cbe3730a..c5db6ce63a 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// TODO: fetch any remote copy of the split that may be available
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
+ val elements = new ArrayBuffer[Any]
+ elements ++= rdd.compute(split)
try {
- // BlockManager will iterate over results from compute to create RDD
- blockManager.put(key, rdd.compute(split), storageLevel, true)
+ // Try to put this block in the blockManager
+ blockManager.put(key, elements, storageLevel, true)
//future.apply() // Wait for the reply from the cache tracker
- blockManager.get(key) match {
- case Some(values) =>
- return values.asInstanceOf[Iterator[T]]
- case None =>
- logWarning("loading partition failed after computing it " + key)
- return null
- }
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
+ return elements.iterator.asInstanceOf[Iterator[T]]
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 91b7bebfb3..8a111f44c9 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.getValues(blockId) match {
case Some(iterator) =>
// Put the block back in memory before returning it
- memoryStore.putValues(blockId, iterator, level, true).data match {
+ // TODO: Consider creating a putValues that also takes in a iterator ?
+ val elements = new ArrayBuffer[Any]
+ elements ++= iterator
+ memoryStore.putValues(blockId, elements, level, true).data match {
case Left(iterator2) =>
return Some(iterator2)
case _ =>
@@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
+ : Long = {
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ put(blockId, elements, level, tellMaster)
+ }
+
/**
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
- : Long = {
+ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ tellMaster: Boolean = true) : Long = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
@@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*/
- def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
@@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
- case Left(iterator) =>
- diskStore.putValues(blockId, iterator, level, false)
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 1286600cd1..096bf8bdd9 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,7 @@
package spark.storage
import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import spark.Logging
@@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
- : PutResult
+ def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ returnValues: Boolean) : PutResult
/**
* Return the size of a block in bytes.
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index fd92a3dc67..8ba64e4b76 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -3,11 +3,15 @@ package spark.storage
import java.nio.ByteBuffer
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel.MapMode
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import java.util.{Random, Date}
-import spark.Utils
import java.text.SimpleDateFormat
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Utils
+
/**
* Stores BlockManager blocks on disk.
*/
@@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
@@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val fileOut = blockManager.wrapForCompression(blockId,
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
- objOut.writeAll(values)
+ objOut.writeAll(values.iterator)
objOut.close()
val length = file.length()
logDebug("Block %s stored as %s file on disk in %d ms".format(
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index e9288fdf43..773970446a 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
if (level.deserialized) {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
- PutResult(sizeEstimate, Left(elements.iterator))
+ val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+ tryToPut(blockId, values, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.iterator))
} else {
- val bytes = blockManager.dataSerialize(blockId, values)
+ val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes))
}
@@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
for (blockId <- selectedBlocks) {
val entry = entries.get(blockId)
val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
index 426c0d26e9..467605981b 100644
--- a/core/src/test/scala/spark/CacheTrackerSuite.scala
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite {
} catch {
case e: Exception =>
throw new SparkException("Error communicating with actor", e)
- }
+ }
}
test("CacheTrackerActor slave initialization & cache status") {
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 31b33eae09..b9c19e61cd 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)