aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java')
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java138
1 files changed, 108 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 7b31c90dac..4230575446 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -17,13 +17,18 @@
package org.apache.spark.memory;
-import java.util.*;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashSet;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.util.Utils;
/**
* Manages the memory allocated by an individual task.
@@ -101,29 +106,104 @@ public class TaskMemoryManager {
private final boolean inHeap;
/**
+ * The size of memory granted to each consumer.
+ */
+ @GuardedBy("this")
+ private final HashSet<MemoryConsumer> consumers;
+
+ /**
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
+ this.consumers = new HashSet<>();
}
/**
- * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ * Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
+ * spill() of consumers to release more memory.
+ *
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(long size) {
- return memoryManager.acquireExecutionMemory(size, taskAttemptId);
+ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+ assert(required >= 0);
+ synchronized (this) {
+ long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
+
+ // try to release memory from other consumers first, then we can reduce the frequency of
+ // spilling, avoid to have too many spilled files.
+ if (got < required) {
+ // Call spill() on other consumers to release memory
+ for (MemoryConsumer c: consumers) {
+ if (c != null && c != consumer && c.getUsed() > 0) {
+ try {
+ long released = c.spill(required - got, consumer);
+ if (released > 0) {
+ logger.info("Task {} released {} from {} for {}", taskAttemptId,
+ Utils.bytesToString(released), c, consumer);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ if (got >= required) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ logger.error("error while calling spill() on " + c, e);
+ throw new OutOfMemoryError("error while calling spill() on " + c + " : "
+ + e.getMessage());
+ }
+ }
+ }
+ }
+
+ // call spill() on itself
+ if (got < required && consumer != null) {
+ try {
+ long released = consumer.spill(required - got, consumer);
+ if (released > 0) {
+ logger.info("Task {} released {} from itself ({})", taskAttemptId,
+ Utils.bytesToString(released), consumer);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ }
+ } catch (IOException e) {
+ logger.error("error while calling spill() on " + consumer, e);
+ throw new OutOfMemoryError("error while calling spill() on " + consumer + " : "
+ + e.getMessage());
+ }
+ }
+
+ consumers.add(consumer);
+ logger.debug("Task {} acquire {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+ return got;
+ }
}
/**
- * Release N bytes of execution memory.
+ * Release N bytes of execution memory for a MemoryConsumer.
*/
- public void releaseExecutionMemory(long size) {
+ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
+ logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
memoryManager.releaseExecutionMemory(size, taskAttemptId);
}
+ /**
+ * Dump the memory usage of all consumers.
+ */
+ public void showMemoryUsage() {
+ logger.info("Memory used in task " + taskAttemptId);
+ synchronized (this) {
+ for (MemoryConsumer c: consumers) {
+ if (c.getUsed() > 0) {
+ logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the page size in bytes.
+ */
public long pageSizeBytes() {
return memoryManager.pageSizeBytes();
}
@@ -134,42 +214,40 @@ public class TaskMemoryManager {
*
* Returns `null` if there was not enough memory to allocate the page.
*/
- public MemoryBlock allocatePage(long size) {
+ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
+ long acquired = acquireExecutionMemory(size, consumer);
+ if (acquired <= 0) {
+ return null;
+ }
+
final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
+ releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
- final long acquiredExecutionMemory = acquireExecutionMemory(size);
- if (acquiredExecutionMemory != size) {
- releaseExecutionMemory(acquiredExecutionMemory);
- synchronized (this) {
- allocatedPages.clear(pageNumber);
- }
- return null;
- }
- final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size);
+ final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
- logger.trace("Allocate page number {} ({} bytes)", pageNumber, size);
+ logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}
/**
- * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
+ * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
*/
- public void freePage(MemoryBlock page) {
+ public void freePage(MemoryBlock page, MemoryConsumer consumer) {
assert (page.pageNumber != -1) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
assert(allocatedPages.get(page.pageNumber));
@@ -182,14 +260,14 @@ public class TaskMemoryManager {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
- releaseExecutionMemory(pageSize);
+ releaseExecutionMemory(pageSize, consumer);
}
/**
* Given a memory page and offset within that page, encode this address into a 64-bit long.
* This address will remain valid as long as the corresponding page has not been freed.
*
- * @param page a data page allocated by {@link TaskMemoryManager#allocatePage(long)}/
+ * @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
* this should be the value that you would pass as the base offset into an
* UNSAFE call (e.g. page.baseOffset() + something).
@@ -261,17 +339,17 @@ public class TaskMemoryManager {
* value can be used to detect memory leaks.
*/
public long cleanUpAllAllocatedMemory() {
- long freedBytes = 0;
- for (MemoryBlock page : pageTable) {
- if (page != null) {
- freedBytes += page.size();
- freePage(page);
+ synchronized (this) {
+ Arrays.fill(pageTable, null);
+ for (MemoryConsumer c: consumers) {
+ if (c != null && c.getUsed() > 0) {
+ // In case of failed task, it's normal to see leaked memory
+ logger.warn("leak " + Utils.bytesToString(c.getUsed()) + " memory from " + c);
+ }
}
+ consumers.clear();
}
-
- freedBytes += memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
-
- return freedBytes;
+ return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
/**