aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-07-25 15:17:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-07-25 15:17:06 -0700
commitf5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd (patch)
tree2f5d5bb48568f2dfcac15516a746293775542a71 /core
parentcda4603de340d533c49feac1b244ddfd291f9bcf (diff)
downloadspark-f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd.tar.gz
spark-f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd.tar.bz2
spark-f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd.zip
[SPARK-16166][CORE] Also take off-heap memory usage into consideration in log and webui display
## What changes were proposed in this pull request? Currently in the log and UI display, only on-heap storage memory is calculated and displayed, ``` 16/06/27 13:41:52 INFO MemoryStore: Block rdd_5_0 stored as values in memory (estimated size 17.8 KB, free 665.9 MB) ``` <img width="1232" alt="untitled" src="https://cloud.githubusercontent.com/assets/850797/16369960/53fb614e-3c6e-11e6-8fa3-7ffe65abcb49.png"> With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992) off-heap memory is supported for data persistence, so here change to also take off-heap storage memory into consideration. ## How was this patch tested? Unit test and local verification. Author: jerryshao <sshao@hortonworks.com> Closes #13920 from jerryshao/SPARK-16166.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala8
7 files changed, 26 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 0210217e41..82442cf561 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -62,13 +62,19 @@ private[spark] abstract class MemoryManager(
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
/**
- * Total available memory for storage, in bytes. This amount can vary over time, depending on
- * the MemoryManager implementation.
+ * Total available on heap memory for storage, in bytes. This amount can vary over time,
+ * depending on the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
def maxOnHeapStorageMemory: Long
/**
+ * Total available off heap memory for storage, in bytes. This amount can vary over time,
+ * depending on the MemoryManager implementation.
+ */
+ def maxOffHeapStorageMemory: Long
+
+ /**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 08155aa298..a6f7db0600 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -55,6 +55,8 @@ private[spark] class StaticMemoryManager(
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
+ override def maxOffHeapStorageMemory: Long = 0L
+
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index c7b36be602..fea2808218 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -67,6 +67,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
+ override def maxOffHeapStorageMemory: Long = synchronized {
+ maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
+ }
+
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
* number of bytes obtained, or 0 if none can be allocated.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 83a9cbd63d..015e71d126 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -96,7 +96,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
- private val maxMemory = memoryManager.maxOnHeapStorageMemory
+ private val maxMemory =
+ memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
@@ -802,7 +803,7 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
+ // Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 0349da0d8a..586339a58d 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -101,7 +101,9 @@ private[spark] class MemoryStore(
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
/** Total amount of memory available for storage, in bytes. */
- private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
+ private def maxMemory: Long = {
+ memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
+ }
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 6a4f409e8e..5f699df821 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -56,6 +56,8 @@ class TestMemoryManager(conf: SparkConf)
}
override def maxOnHeapStorageMemory: Long = Long.MaxValue
+ override def maxOffHeapStorageMemory: Long = 0L
+
private var oomOnce = false
private var available = Long.MaxValue
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6821582254..8077a1b941 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -239,8 +239,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
- assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
- assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
+ assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should equal 40000")
+ assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
@@ -269,8 +269,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
- memStatus._1 should equal (20000L)
- memStatus._2 should equal (20000L)
+ memStatus._1 should equal (40000L)
+ memStatus._2 should equal (40000L)
}
}