diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-08-02 22:00:46 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-08-02 22:01:01 -0700 |
commit | fb2a2079fa10ea8f338d68945a94238dda9fbd66 (patch) | |
tree | ebf529cc48ba793babf3ff44ca27f8d70e934b72 | |
parent | c137928cbe74446254fdbd656c50c1a1c8930094 (diff) | |
download | spark-fb2a2079fa10ea8f338d68945a94238dda9fbd66.tar.gz spark-fb2a2079fa10ea8f338d68945a94238dda9fbd66.tar.bz2 spark-fb2a2079fa10ea8f338d68945a94238dda9fbd66.zip |
[Minor] Fixes on top of #1679
Minor fixes on top of #1679.
Author: Andrew Or <andrewor14@gmail.com>
Closes #1736 from andrewor14/amend-#1679 and squashes the following commits:
3b46f5e [Andrew Or] Minor fixes
(cherry picked from commit 3dc55fdf450b4237f7c592fce56d1467fd206366)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/StorageUtils.scala | 11 |
2 files changed, 6 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index e939318a02..3f14c40ec6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).sum - val remainingMem = storageStatusList.map(_.memRemaining).sum - (maxMem - remainingMem) / 1024 / 1024 + val memUsed = storageStatusList.map(_.memUsed).sum + memUsed / 1024 / 1024 } }) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 0a0a448baa..2bd6b749be 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { def memRemaining: Long = maxMem - memUsed /** Return the memory used by this block manager. */ - def memUsed: Long = - _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum /** Return the disk space used by this block manager. */ - def diskUsed: Long = - _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum + def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum /** Return the off-heap space used by this block manager. */ - def offHeapUsed: Long = - _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum + def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum /** Return the memory used by the given RDD in this block manager in O(1) time. */ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L) @@ -246,7 +243,7 @@ private[spark] object StorageUtils { val rddId = rddInfo.id // Assume all blocks belonging to the same RDD have the same storage level val storageLevel = statuses - .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE) + .flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE) val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum val memSize = statuses.map(_.memUsedByRdd(rddId)).sum val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum |