aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-08 21:44:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-08 21:44:59 -0700
commit67fbecbf32fced87d3accd2618fef2af9f44fae2 (patch)
treea0b14456a5c7fad7b98a3c0508884f4e9698f6b6 /sql
parent09841290055770a619a2e72fbaef1a5e694916ae (diff)
downloadspark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.gz
spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.bz2
spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.zip
[SPARK-10956] Common MemoryManager interface for storage and execution
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible. This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks. Author: Andrew Or <andrew@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Author: andrewor14 <andrew@databricks.com> Closes #9000 from andrewor14/memory-manager.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala28
1 files changed, 27 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index 48c3938ff8..ff65d7bdf8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -17,12 +17,18 @@
package org.apache.spark.sql.execution
+import scala.collection.mutable
+
+import org.apache.spark.memory.MemoryManager
import org.apache.spark.shuffle.ShuffleMemoryManager
+import org.apache.spark.storage.{BlockId, BlockStatus}
+
/**
* A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
*/
-class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) {
+class TestShuffleMemoryManager
+ extends ShuffleMemoryManager(new GrantEverythingMemoryManager, 4 * 1024 * 1024) {
private var oom = false
override def tryToAcquire(numBytes: Long): Long = {
@@ -49,3 +55,23 @@ class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1
oom = true
}
}
+
+private class GrantEverythingMemoryManager extends MemoryManager {
+ override def acquireExecutionMemory(numBytes: Long): Long = numBytes
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
+ override def releaseExecutionMemory(numBytes: Long): Unit = { }
+ override def releaseStorageMemory(numBytes: Long): Unit = { }
+ override def releaseStorageMemory(): Unit = { }
+ override def releaseUnrollMemory(numBytes: Long): Unit = { }
+ override def maxExecutionMemory: Long = Long.MaxValue
+ override def maxStorageMemory: Long = Long.MaxValue
+ override def executionMemoryUsed: Long = Long.MaxValue
+ override def storageMemoryUsed: Long = Long.MaxValue
+}