aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java')
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java37
1 files changed, 16 insertions, 21 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index a05a79c88d..a4a571f15a 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -76,9 +76,6 @@ public class TaskMemoryManager {
/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
- /** Bit mask for the upper 13 bits of a long */
- private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
-
/**
* Similar to an operating system's page table, this array maps page numbers into base object
* pointers, allowing us to translate between the hashtable's internal 64-bit address
@@ -132,11 +129,10 @@ public class TaskMemoryManager {
*
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(
- long required,
- MemoryMode mode,
- MemoryConsumer consumer) {
+ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
assert(required >= 0);
+ assert(consumer != null);
+ MemoryMode mode = consumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
// memory here, then it may not make sense to spill since that would only end up freeing
// off-heap memory. This is subject to change, though, so it may be risky to make this
@@ -149,10 +145,10 @@ public class TaskMemoryManager {
if (got < required) {
// Call spill() on other consumers to release memory
for (MemoryConsumer c: consumers) {
- if (c != consumer && c.getUsed() > 0) {
+ if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
try {
long released = c.spill(required - got, consumer);
- if (released > 0 && mode == tungstenMemoryMode) {
+ if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
@@ -170,10 +166,10 @@ public class TaskMemoryManager {
}
// call spill() on itself
- if (got < required && consumer != null) {
+ if (got < required) {
try {
long released = consumer.spill(required - got, consumer);
- if (released > 0 && mode == tungstenMemoryMode) {
+ if (released > 0) {
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
@@ -185,9 +181,7 @@ public class TaskMemoryManager {
}
}
- if (consumer != null) {
- consumers.add(consumer);
- }
+ consumers.add(consumer);
logger.debug("Task {} acquire {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
return got;
}
@@ -196,9 +190,9 @@ public class TaskMemoryManager {
/**
* Release N bytes of execution memory for a MemoryConsumer.
*/
- public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) {
+ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
- memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
+ memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode());
}
/**
@@ -241,12 +235,14 @@ public class TaskMemoryManager {
* contains fewer bytes than requested, so callers should verify the size of returned pages.
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
+ assert(consumer != null);
+ assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
- long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
+ long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) {
return null;
}
@@ -255,7 +251,7 @@ public class TaskMemoryManager {
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
- releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
+ releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
@@ -299,7 +295,7 @@ public class TaskMemoryManager {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
- releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
+ releaseExecutionMemory(pageSize, consumer);
}
/**
@@ -396,8 +392,7 @@ public class TaskMemoryManager {
Arrays.fill(pageTable, null);
}
-
- // release the memory that is not used by any consumer.
+ // release the memory that is not used by any consumer (acquired for pages in tungsten mode).
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);