diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-02 12:33:23 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-02 12:33:23 -0700 |
commit | b8fe6723999e96a794181c3de1c0ab2dc8ff46be (patch) | |
tree | 8052105c94e7e56de85ea711efa3c5c4607ef4ba | |
parent | b980eabd86f5f3045db802b506e853eb9d7b15d5 (diff) | |
parent | 1a07bb9ba42df39260db1c3a222433fb756fe036 (diff) | |
download | spark-b8fe6723999e96a794181c3de1c0ab2dc8ff46be.tar.gz spark-b8fe6723999e96a794181c3de1c0ab2dc8ff46be.tar.bz2 spark-b8fe6723999e96a794181c3de1c0ab2dc8ff46be.zip |
Merge pull request #162 from shivaram/dev
Use maxMemory to better estimate memory available for BlockManager cache
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 2 |
2 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 19870408d3..22110832f8 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -222,11 +222,16 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl // TODO: also register a listener for when it unloads logInfo("Computing partition " + split) try { - val values = new ArrayBuffer[Any] - values ++= rdd.compute(split) - blockManager.put(key, values.iterator, storageLevel, false) + // BlockManager will iterate over results from compute to create RDD + blockManager.put(key, rdd.compute(split), storageLevel, false) //future.apply() // Wait for the reply from the cache tracker - return values.iterator.asInstanceOf[Iterator[T]] + blockManager.get(key) match { + case Some(values) => + return values.asInstanceOf[Iterator[T]] + case None => + logWarning("loading partition failed after computing it " + key) + return null + } } finally { loading.synchronized { loading.remove(key) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 5067601198..cde74e5805 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -580,6 +580,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m object BlockManager { def getMaxMemoryFromSystemProperties(): Long = { val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble - (Runtime.getRuntime.totalMemory * memoryFraction).toLong + (Runtime.getRuntime.maxMemory * memoryFraction).toLong } } |