aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-02 12:33:23 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-02 12:33:23 -0700
commitb8fe6723999e96a794181c3de1c0ab2dc8ff46be (patch)
tree8052105c94e7e56de85ea711efa3c5c4607ef4ba
parentb980eabd86f5f3045db802b506e853eb9d7b15d5 (diff)
parent1a07bb9ba42df39260db1c3a222433fb756fe036 (diff)
downloadspark-b8fe6723999e96a794181c3de1c0ab2dc8ff46be.tar.gz
spark-b8fe6723999e96a794181c3de1c0ab2dc8ff46be.tar.bz2
spark-b8fe6723999e96a794181c3de1c0ab2dc8ff46be.zip
Merge pull request #162 from shivaram/dev
Use maxMemory to better estimate memory available for BlockManager cache
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala13
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
2 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 19870408d3..22110832f8 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -222,11 +222,16 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
try {
- val values = new ArrayBuffer[Any]
- values ++= rdd.compute(split)
- blockManager.put(key, values.iterator, storageLevel, false)
+ // BlockManager will iterate over results from compute to create RDD
+ blockManager.put(key, rdd.compute(split), storageLevel, false)
//future.apply() // Wait for the reply from the cache tracker
- return values.iterator.asInstanceOf[Iterator[T]]
+ blockManager.get(key) match {
+ case Some(values) =>
+ return values.asInstanceOf[Iterator[T]]
+ case None =>
+ logWarning("loading partition failed after computing it " + key)
+ return null
+ }
} finally {
loading.synchronized {
loading.remove(key)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 5067601198..cde74e5805 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -580,6 +580,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
def getMaxMemoryFromSystemProperties(): Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
- (Runtime.getRuntime.totalMemory * memoryFraction).toLong
+ (Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
}