aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-12 14:36:28 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-12 14:36:28 -0700
commit23015ccac045dd0e2c95c8625ee354984a8d594c (patch)
treebe654b81d3e846b0a4db3c2c13081c47ed66a5d9 /core
parentbd78bbb2cfdac42acac9dff1d970e52f8f1d6729 (diff)
parent8577523f3766f27fa47090007276e39001e7861d (diff)
downloadspark-23015ccac045dd0e2c95c8625ee354984a8d594c.tar.gz
spark-23015ccac045dd0e2c95c8625ee354984a8d594c.tar.bz2
spark-23015ccac045dd0e2c95c8625ee354984a8d594c.zip
Merge pull request #271 from shivaram/block-manager-npe-fix
Change block manager to accept a ArrayBuffer
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala22
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala12
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala16
-rw-r--r--core/src/test/scala/spark/CacheTrackerSuite.scala2
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala10
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala16
8 files changed, 58 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index d9cbe3730a..c5db6ce63a 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// TODO: fetch any remote copy of the split that may be available
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
+ val elements = new ArrayBuffer[Any]
+ elements ++= rdd.compute(split)
try {
- // BlockManager will iterate over results from compute to create RDD
- blockManager.put(key, rdd.compute(split), storageLevel, true)
+ // Try to put this block in the blockManager
+ blockManager.put(key, elements, storageLevel, true)
//future.apply() // Wait for the reply from the cache tracker
- blockManager.get(key) match {
- case Some(values) =>
- return values.asInstanceOf[Iterator[T]]
- case None =>
- logWarning("loading partition failed after computing it " + key)
- return null
- }
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
+ return elements.iterator.asInstanceOf[Iterator[T]]
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 91b7bebfb3..8a111f44c9 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
diskStore.getValues(blockId) match {
case Some(iterator) =>
// Put the block back in memory before returning it
- memoryStore.putValues(blockId, iterator, level, true).data match {
+ // TODO: Consider creating a putValues that also takes in a iterator ?
+ val elements = new ArrayBuffer[Any]
+ elements ++= iterator
+ memoryStore.putValues(blockId, elements, level, true).data match {
case Left(iterator2) =>
return Some(iterator2)
case _ =>
@@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
+ : Long = {
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ put(blockId, elements, level, tellMaster)
+ }
+
/**
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
- : Long = {
+ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ tellMaster: Boolean = true) : Long = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
@@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*/
- def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
@@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
- case Left(iterator) =>
- diskStore.putValues(blockId, iterator, level, false)
+ case Left(elements) =>
+ diskStore.putValues(blockId, elements, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 1286600cd1..096bf8bdd9 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,7 @@
package spark.storage
import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import spark.Logging
@@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
- : PutResult
+ def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+ returnValues: Boolean) : PutResult
/**
* Return the size of a block in bytes.
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index fd92a3dc67..8ba64e4b76 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -3,11 +3,15 @@ package spark.storage
import java.nio.ByteBuffer
import java.io.{File, FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel.MapMode
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import java.util.{Random, Date}
-import spark.Utils
import java.text.SimpleDateFormat
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Utils
+
/**
* Stores BlockManager blocks on disk.
*/
@@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
@@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val fileOut = blockManager.wrapForCompression(blockId,
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
- objOut.writeAll(values)
+ objOut.writeAll(values.iterator)
objOut.close()
val length = file.length()
logDebug("Block %s stored as %s file on disk in %d ms".format(
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index e9288fdf43..773970446a 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def putValues(
blockId: String,
- values: Iterator[Any],
+ values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
if (level.deserialized) {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
- PutResult(sizeEstimate, Left(elements.iterator))
+ val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+ tryToPut(blockId, values, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.iterator))
} else {
- val bytes = blockManager.dataSerialize(blockId, values)
+ val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes))
}
@@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
for (blockId <- selectedBlocks) {
val entry = entries.get(blockId)
val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
index 426c0d26e9..467605981b 100644
--- a/core/src/test/scala/spark/CacheTrackerSuite.scala
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite {
} catch {
case e: Exception =>
throw new SparkException("Error communicating with actor", e)
- }
+ }
}
test("CacheTrackerActor slave initialization & cache status") {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 433d2fdc19..76b0884481 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -27,6 +27,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc = null
}
System.clearProperty("spark.reducer.maxMbInFlight")
+ System.clearProperty("spark.storage.memoryFraction")
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
@@ -156,4 +157,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data.count() === 1000)
assert(data.count() === 1000)
}
+
+ test("compute without caching with low memory") {
+ System.setProperty("spark.storage.memoryFraction", "0.0001")
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ }
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 31b33eae09..b9c19e61cd 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)