diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-05-20 16:37:11 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-05-20 16:37:11 -0700 |
commit | 7956dd7ab03e1542d89dd94c043f1e5131684199 (patch) | |
tree | a753324eb6f10972f914ad5fbab29d97b88c8e26 | |
parent | 3c434cbfd0d6821e5bcf572be792b787a514018b (diff) | |
download | spark-7956dd7ab03e1542d89dd94c043f1e5131684199.tar.gz spark-7956dd7ab03e1542d89dd94c043f1e5131684199.tar.bz2 spark-7956dd7ab03e1542d89dd94c043f1e5131684199.zip |
[SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation
When on-heap memory allocation is used, ExecutorMemoryManager should maintain a cache / pool of buffers for re-use by tasks. This will significantly improve the performance of the new Tungsten's sort-shuffle for jobs with many short-lived tasks by eliminating a major source of GC.
This pull request is a minimum-viable-implementation of this idea. In its current form, this patch significantly improves performance on a stress test which launches huge numbers of short-lived shuffle map tasks back-to-back in the same JVM.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #6227 from JoshRosen/SPARK-7698 and squashes the following commits:
fd6cb55 [Josh Rosen] SoftReference -> WeakReference
b154e86 [Josh Rosen] WIP sketch of pooling in ExecutorMemoryManager
-rw-r--r-- | unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java | 57 |
1 files changed, 55 insertions, 2 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java index 62c29c8cc1..cbbe859462 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java @@ -17,6 +17,12 @@ package org.apache.spark.unsafe.memory; +import java.lang.ref.WeakReference; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import javax.annotation.concurrent.GuardedBy; + /** * Manages memory for an executor. Individual operators / tasks allocate memory through * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager. @@ -33,6 +39,12 @@ public class ExecutorMemoryManager { */ final boolean inHeap; + @GuardedBy("this") + private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize = + new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>(); + + private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; + /** * Construct a new ExecutorMemoryManager. * @@ -44,15 +56,56 @@ public class ExecutorMemoryManager { } /** + * Returns true if allocations of the given size should go through the pooling mechanism and + * false otherwise. + */ + private boolean shouldPool(long size) { + // Very small allocations are less likely to benefit from pooling. + // At some point, we should explore supporting pooling for off-heap memory, but for now we'll + // ignore that case in the interest of simplicity. + return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator; + } + + /** * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed * to be zeroed out (call `zero()` on the result if this is necessary). */ MemoryBlock allocate(long size) throws OutOfMemoryError { - return allocator.allocate(size); + if (shouldPool(size)) { + synchronized (this) { + final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool != null) { + while (!pool.isEmpty()) { + final WeakReference<MemoryBlock> blockReference = pool.pop(); + final MemoryBlock memory = blockReference.get(); + if (memory != null) { + assert (memory.size() == size); + return memory; + } + } + bufferPoolsBySize.remove(size); + } + } + return allocator.allocate(size); + } else { + return allocator.allocate(size); + } } void free(MemoryBlock memory) { - allocator.free(memory); + final long size = memory.size(); + if (shouldPool(size)) { + synchronized (this) { + LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool == null) { + pool = new LinkedList<WeakReference<MemoryBlock>>(); + bufferPoolsBySize.put(size, pool); + } + pool.add(new WeakReference<MemoryBlock>(memory)); + } + } else { + allocator.free(memory); + } } } |