diff options
author | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2012-10-11 00:42:46 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2012-10-11 00:42:46 -0700 |
commit | 2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62 (patch) | |
tree | 5c6d7ec756a651b449b1eadeb8b31b3b9f19ea9c /core | |
parent | 4001cbdec134183f93d4ed169f9141c186cfa7f9 (diff) | |
download | spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.tar.gz spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.tar.bz2 spark-2cf40c5fd52a8f00c420bc4d342f0ad6e956ed62.zip |
Change block manager to accept a ArrayBuffer instead of an iterator to ensure
that the computation can proceed even if we run out of memory to cache the
block. Update CacheTracker to use this new interface
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockStore.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/DiskStore.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/MemoryStore.scala | 16 | ||||
-rw-r--r-- | core/src/test/scala/spark/CacheTrackerSuite.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 16 |
7 files changed, 48 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index d9cbe3730a..c5db6ce63a 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b // TODO: fetch any remote copy of the split that may be available // TODO: also register a listener for when it unloads logInfo("Computing partition " + split) + val elements = new ArrayBuffer[Any] + elements ++= rdd.compute(split) try { - // BlockManager will iterate over results from compute to create RDD - blockManager.put(key, rdd.compute(split), storageLevel, true) + // Try to put this block in the blockManager + blockManager.put(key, elements, storageLevel, true) //future.apply() // Wait for the reply from the cache tracker - blockManager.get(key) match { - case Some(values) => - return values.asInstanceOf[Iterator[T]] - case None => - logWarning("loading partition failed after computing it " + key) - return null - } } finally { loading.synchronized { loading.remove(key) loading.notifyAll() } } + return elements.iterator.asInstanceOf[Iterator[T]] } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 91b7bebfb3..8a111f44c9 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m diskStore.getValues(blockId) match { case Some(iterator) => // Put the block back in memory before returning it - memoryStore.putValues(blockId, iterator, level, true).data match { + // TODO: Consider creating a putValues that also takes in a iterator ? + val elements = new ArrayBuffer[Any] + elements ++= iterator + memoryStore.putValues(blockId, elements, level, true).data match { case Left(iterator2) => return Some(iterator2) case _ => @@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) + : Long = { + val elements = new ArrayBuffer[Any] + elements ++= values + put(blockId, elements, level, tellMaster) + } + /** * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ - def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) - : Long = { + def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, + tellMaster: Boolean = true) : Long = { if (blockId == null) { throw new IllegalArgumentException("Block Id is null") @@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. */ - def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) { + def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) @@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level.useDisk && !diskStore.contains(blockId)) { logInfo("Writing block " + blockId + " to disk") data match { - case Left(iterator) => - diskStore.putValues(blockId, iterator, level, false) + case Left(elements) => + diskStore.putValues(blockId, elements, level, false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 1286600cd1..096bf8bdd9 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,6 +1,7 @@ package spark.storage import java.nio.ByteBuffer +import scala.collection.mutable.ArrayBuffer import spark.Logging @@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { * @return a PutResult that contains the size of the data, as well as the values put if * returnValues is true (if not, the result's data field can be null) */ - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean) - : PutResult + def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, + returnValues: Boolean) : PutResult /** * Return the size of a block in bytes. diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index fd92a3dc67..8ba64e4b76 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -3,11 +3,15 @@ package spark.storage import java.nio.ByteBuffer import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.channels.FileChannel.MapMode -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import java.util.{Random, Date} -import spark.Utils import java.text.SimpleDateFormat +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + +import scala.collection.mutable.ArrayBuffer + +import spark.Utils + /** * Stores BlockManager blocks on disk. */ @@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def putValues( blockId: String, - values: Iterator[Any], + values: ArrayBuffer[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { @@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val fileOut = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) - objOut.writeAll(values) + objOut.writeAll(values.iterator) objOut.close() val length = file.length() logDebug("Block %s stored as %s file on disk in %d ms".format( diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index e9288fdf43..773970446a 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def putValues( blockId: String, - values: Iterator[Any], + values: ArrayBuffer[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { if (level.deserialized) { - val elements = new ArrayBuffer[Any] - elements ++= values - val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - tryToPut(blockId, elements, sizeEstimate, true) - PutResult(sizeEstimate, Left(elements.iterator)) + val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) + tryToPut(blockId, values, sizeEstimate, true) + PutResult(sizeEstimate, Left(values.iterator)) } else { - val bytes = blockManager.dataSerialize(blockId, values) + val bytes = blockManager.dataSerialize(blockId, values.iterator) tryToPut(blockId, bytes, bytes.limit, false) PutResult(bytes.limit(), Right(bytes)) } @@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. val data = if (deserialized) { - Left(value.asInstanceOf[ArrayBuffer[Any]].iterator) + Left(value.asInstanceOf[ArrayBuffer[Any]]) } else { Right(value.asInstanceOf[ByteBuffer].duplicate()) } @@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) for (blockId <- selectedBlocks) { val entry = entries.get(blockId) val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) + Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala index 426c0d26e9..467605981b 100644 --- a/core/src/test/scala/spark/CacheTrackerSuite.scala +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite { } catch { case e: Exception => throw new SparkException("Error communicating with actor", e) - } + } } test("CacheTrackerActor slave initialization & cache status") { diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 31b33eae09..b9c19e61cd 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY) - store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true) + store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true) assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") != None, "list3 was not in store") @@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true) assert(store.get("list1") != None, "list1 was not in store") assert(store.get("list1").get.size == 2) assert(store.get("list2") != None, "list2 was not in store") @@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val list3 = List(new Array[Byte](200), new Array[Byte](200)) val list4 = List(new Array[Byte](200), new Array[Byte](200)) // First store list1 and list2, both in memory, and list3, on disk only - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER) - store.put("list3", list3.iterator, StorageLevel.DISK_ONLY) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true) + store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true) // At this point LRU should not kick in because list3 is only on disk assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1").get.size === 2) @@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER) + store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) |