aboutsummaryrefslogtreecommitdiff
path: root/unsafe
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-05-20 16:37:11 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-20 16:37:11 -0700
commit7956dd7ab03e1542d89dd94c043f1e5131684199 (patch)
treea753324eb6f10972f914ad5fbab29d97b88c8e26 /unsafe
parent3c434cbfd0d6821e5bcf572be792b787a514018b (diff)
downloadspark-7956dd7ab03e1542d89dd94c043f1e5131684199.tar.gz
spark-7956dd7ab03e1542d89dd94c043f1e5131684199.tar.bz2
spark-7956dd7ab03e1542d89dd94c043f1e5131684199.zip
[SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation
When on-heap memory allocation is used, ExecutorMemoryManager should maintain a cache / pool of buffers for re-use by tasks. This will significantly improve the performance of the new Tungsten's sort-shuffle for jobs with many short-lived tasks by eliminating a major source of GC. This pull request is a minimum-viable-implementation of this idea. In its current form, this patch significantly improves performance on a stress test which launches huge numbers of short-lived shuffle map tasks back-to-back in the same JVM. Author: Josh Rosen <joshrosen@databricks.com> Closes #6227 from JoshRosen/SPARK-7698 and squashes the following commits: fd6cb55 [Josh Rosen] SoftReference -> WeakReference b154e86 [Josh Rosen] WIP sketch of pooling in ExecutorMemoryManager
Diffstat (limited to 'unsafe')
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java57
1 files changed, 55 insertions, 2 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
index 62c29c8cc1..cbbe859462 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
@@ -17,6 +17,12 @@
package org.apache.spark.unsafe.memory;
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+
/**
* Manages memory for an executor. Individual operators / tasks allocate memory through
* {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager.
@@ -33,6 +39,12 @@ public class ExecutorMemoryManager {
*/
final boolean inHeap;
+ @GuardedBy("this")
+ private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
+ new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>();
+
+ private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+
/**
* Construct a new ExecutorMemoryManager.
*
@@ -44,15 +56,56 @@ public class ExecutorMemoryManager {
}
/**
+ * Returns true if allocations of the given size should go through the pooling mechanism and
+ * false otherwise.
+ */
+ private boolean shouldPool(long size) {
+ // Very small allocations are less likely to benefit from pooling.
+ // At some point, we should explore supporting pooling for off-heap memory, but for now we'll
+ // ignore that case in the interest of simplicity.
+ return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator;
+ }
+
+ /**
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
* to be zeroed out (call `zero()` on the result if this is necessary).
*/
MemoryBlock allocate(long size) throws OutOfMemoryError {
- return allocator.allocate(size);
+ if (shouldPool(size)) {
+ synchronized (this) {
+ final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ if (pool != null) {
+ while (!pool.isEmpty()) {
+ final WeakReference<MemoryBlock> blockReference = pool.pop();
+ final MemoryBlock memory = blockReference.get();
+ if (memory != null) {
+ assert (memory.size() == size);
+ return memory;
+ }
+ }
+ bufferPoolsBySize.remove(size);
+ }
+ }
+ return allocator.allocate(size);
+ } else {
+ return allocator.allocate(size);
+ }
}
void free(MemoryBlock memory) {
- allocator.free(memory);
+ final long size = memory.size();
+ if (shouldPool(size)) {
+ synchronized (this) {
+ LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ if (pool == null) {
+ pool = new LinkedList<WeakReference<MemoryBlock>>();
+ bufferPoolsBySize.put(size, pool);
+ }
+ pool.add(new WeakReference<MemoryBlock>(memory));
+ }
+ } else {
+ allocator.free(memory);
+ }
}
}