aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-11-06 18:17:34 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-11-06 18:17:34 -0800
commit30b706b7b36482921ec04145a0121ca147984fa8 (patch)
tree79c8309fa8255ac238d2e9dc5c4e4f0349e9da22 /core/src/main/java
parent105732dcc6b651b9779f4a5773a759c5b4fbd21d (diff)
downloadspark-30b706b7b36482921ec04145a0121ca147984fa8.tar.gz
spark-30b706b7b36482921ec04145a0121ca147984fa8.tar.bz2
spark-30b706b7b36482921ec04145a0121ca147984fa8.zip
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory. ## User-facing changes This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit. ## Internals changes This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies. There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances). I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution. ## TODOs - [x] Fix handful of test failures in the MemoryManagerSuites. - [x] Fix remaining TODO comments in code. - [ ] Document new configuration. - [x] Fix commented-out tests / asserts: - [x] UnifiedMemoryManagerSuite. - [x] Write tests that exercise the new off-heap memory management policies. Author: Josh Rosen <joshrosen@databricks.com> Closes #9344 from JoshRosen/offheap-memory-accounting.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryConsumer.java7
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryMode.java26
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java72
3 files changed, 78 insertions, 27 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8fbdb72832..36138cc9a2 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -17,15 +17,15 @@
package org.apache.spark.memory;
-
import java.io.IOException;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
-
/**
* An memory consumer of TaskMemoryManager, which support spilling.
+ *
+ * Note: this only supports allocation / spilling of Tungsten memory.
*/
public abstract class MemoryConsumer {
@@ -36,7 +36,6 @@ public abstract class MemoryConsumer {
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
- this.used = 0;
}
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
@@ -67,6 +66,8 @@ public abstract class MemoryConsumer {
*
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
*
+ * Note: today, this only frees Tungsten-managed pages.
+ *
* @param size the amount of memory should be released
* @param trigger the MemoryConsumer that trigger this spilling
* @return the amount of released memory in bytes
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryMode.java b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
new file mode 100644
index 0000000000..3a5e72d8aa
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+import org.apache.spark.annotation.Private;
+
+@Private
+public enum MemoryMode {
+ ON_HEAP,
+ OFF_HEAP
+}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 6440f9c0f3..5f743b2885 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -103,10 +103,10 @@ public class TaskMemoryManager {
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
- private final boolean inHeap;
+ final MemoryMode tungstenMemoryMode;
/**
- * The size of memory granted to each consumer.
+ * Tracks spillable memory consumers.
*/
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;
@@ -115,7 +115,7 @@ public class TaskMemoryManager {
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
- this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
+ this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
@@ -127,12 +127,19 @@ public class TaskMemoryManager {
*
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+ public long acquireExecutionMemory(
+ long required,
+ MemoryMode mode,
+ MemoryConsumer consumer) {
assert(required >= 0);
+ // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
+ // memory here, then it may not make sense to spill since that would only end up freeing
+ // off-heap memory. This is subject to change, though, so it may be risky to make this
+ // optimization now in case we forget to undo it late when making changes.
synchronized (this) {
- long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
+ long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
- // try to release memory from other consumers first, then we can reduce the frequency of
+ // Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
@@ -140,10 +147,10 @@ public class TaskMemoryManager {
if (c != consumer && c.getUsed() > 0) {
try {
long released = c.spill(required - got, consumer);
- if (released > 0) {
- logger.info("Task {} released {} from {} for {}", taskAttemptId,
+ if (released > 0 && mode == tungstenMemoryMode) {
+ logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
- got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
@@ -161,10 +168,10 @@ public class TaskMemoryManager {
if (got < required && consumer != null) {
try {
long released = consumer.spill(required - got, consumer);
- if (released > 0) {
- logger.info("Task {} released {} from itself ({})", taskAttemptId,
+ if (released > 0 && mode == tungstenMemoryMode) {
+ logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
- got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
@@ -184,9 +191,9 @@ public class TaskMemoryManager {
/**
* Release N bytes of execution memory for a MemoryConsumer.
*/
- public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
+ public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
- memoryManager.releaseExecutionMemory(size, taskAttemptId);
+ memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
}
/**
@@ -195,11 +202,19 @@ public class TaskMemoryManager {
public void showMemoryUsage() {
logger.info("Memory used in task " + taskAttemptId);
synchronized (this) {
+ long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
- if (c.getUsed() > 0) {
- logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
+ long totalMemUsage = c.getUsed();
+ memoryAccountedForByConsumers += totalMemUsage;
+ if (totalMemUsage > 0) {
+ logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
}
}
+ long memoryNotAccountedFor =
+ memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
+ logger.info(
+ "{} bytes of memory were used by task {} but are not associated with specific consumers",
+ memoryNotAccountedFor, taskAttemptId);
}
}
@@ -214,7 +229,8 @@ public class TaskMemoryManager {
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
- * Returns `null` if there was not enough memory to allocate the page.
+ * Returns `null` if there was not enough memory to allocate the page. May return a page that
+ * contains fewer bytes than requested, so callers should verify the size of returned pages.
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
@@ -222,7 +238,7 @@ public class TaskMemoryManager {
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
- long acquired = acquireExecutionMemory(size, consumer);
+ long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
if (acquired <= 0) {
return null;
}
@@ -231,7 +247,7 @@ public class TaskMemoryManager {
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
- releaseExecutionMemory(acquired, consumer);
+ releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
@@ -262,7 +278,7 @@ public class TaskMemoryManager {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
- releaseExecutionMemory(pageSize, consumer);
+ releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
}
/**
@@ -276,7 +292,7 @@ public class TaskMemoryManager {
* @return an encoded page address.
*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
- if (!inHeap) {
+ if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
@@ -305,7 +321,7 @@ public class TaskMemoryManager {
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public Object getPage(long pagePlusOffsetAddress) {
- if (inHeap) {
+ if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
@@ -323,7 +339,7 @@ public class TaskMemoryManager {
*/
public long getOffsetInPage(long pagePlusOffsetAddress) {
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
- if (inHeap) {
+ if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
return offsetInPage;
} else {
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
@@ -351,11 +367,19 @@ public class TaskMemoryManager {
}
consumers.clear();
}
+
+ for (MemoryBlock page : pageTable) {
+ if (page != null) {
+ memoryManager.tungstenMemoryAllocator().free(page);
+ }
+ }
+ Arrays.fill(pageTable, null);
+
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
/**
- * Returns the memory consumption, in bytes, for the current task
+ * Returns the memory consumption, in bytes, for the current task.
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);