diff options
Diffstat (limited to 'unsafe/src/main/java')
4 files changed, 53 insertions, 400 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java deleted file mode 100644 index cbbe859462..0000000000 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.memory; - -import java.lang.ref.WeakReference; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import javax.annotation.concurrent.GuardedBy; - -/** - * Manages memory for an executor. Individual operators / tasks allocate memory through - * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager. - */ -public class ExecutorMemoryManager { - - /** - * Allocator, exposed for enabling untracked allocations of temporary data structures. - */ - public final MemoryAllocator allocator; - - /** - * Tracks whether memory will be allocated on the JVM heap or off-heap using sun.misc.Unsafe. - */ - final boolean inHeap; - - @GuardedBy("this") - private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize = - new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>(); - - private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; - - /** - * Construct a new ExecutorMemoryManager. - * - * @param allocator the allocator that will be used - */ - public ExecutorMemoryManager(MemoryAllocator allocator) { - this.inHeap = allocator instanceof HeapMemoryAllocator; - this.allocator = allocator; - } - - /** - * Returns true if allocations of the given size should go through the pooling mechanism and - * false otherwise. - */ - private boolean shouldPool(long size) { - // Very small allocations are less likely to benefit from pooling. - // At some point, we should explore supporting pooling for off-heap memory, but for now we'll - // ignore that case in the interest of simplicity. - return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator; - } - - /** - * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed - * to be zeroed out (call `zero()` on the result if this is necessary). - */ - MemoryBlock allocate(long size) throws OutOfMemoryError { - if (shouldPool(size)) { - synchronized (this) { - final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); - if (pool != null) { - while (!pool.isEmpty()) { - final WeakReference<MemoryBlock> blockReference = pool.pop(); - final MemoryBlock memory = blockReference.get(); - if (memory != null) { - assert (memory.size() == size); - return memory; - } - } - bufferPoolsBySize.remove(size); - } - } - return allocator.allocate(size); - } else { - return allocator.allocate(size); - } - } - - void free(MemoryBlock memory) { - final long size = memory.size(); - if (shouldPool(size)) { - synchronized (this) { - LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); - if (pool == null) { - pool = new LinkedList<WeakReference<MemoryBlock>>(); - bufferPoolsBySize.put(size, pool); - } - pool.add(new WeakReference<MemoryBlock>(memory)); - } - } else { - allocator.free(memory); - } - } - -} diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java index 6722301df1..ebe90d9e63 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java @@ -17,22 +17,71 @@ package org.apache.spark.unsafe.memory; +import javax.annotation.concurrent.GuardedBy; +import java.lang.ref.WeakReference; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + /** * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array. */ public class HeapMemoryAllocator implements MemoryAllocator { + @GuardedBy("this") + private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize = + new HashMap<>(); + + private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; + + /** + * Returns true if allocations of the given size should go through the pooling mechanism and + * false otherwise. + */ + private boolean shouldPool(long size) { + // Very small allocations are less likely to benefit from pooling. + return size >= POOLING_THRESHOLD_BYTES; + } + @Override public MemoryBlock allocate(long size) throws OutOfMemoryError { if (size % 8 != 0) { throw new IllegalArgumentException("Size " + size + " was not a multiple of 8"); } + if (shouldPool(size)) { + synchronized (this) { + final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool != null) { + while (!pool.isEmpty()) { + final WeakReference<MemoryBlock> blockReference = pool.pop(); + final MemoryBlock memory = blockReference.get(); + if (memory != null) { + assert (memory.size() == size); + return memory; + } + } + bufferPoolsBySize.remove(size); + } + } + } long[] array = new long[(int) (size / 8)]; return MemoryBlock.fromLongArray(array); } @Override public void free(MemoryBlock memory) { - // Do nothing + final long size = memory.size(); + if (shouldPool(size)) { + synchronized (this) { + LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool == null) { + pool = new LinkedList<>(); + bufferPoolsBySize.put(size, pool); + } + pool.add(new WeakReference<>(memory)); + } + } else { + // Do nothing + } } } diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java index dd75820834..e3e7947115 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java @@ -30,9 +30,10 @@ public class MemoryBlock extends MemoryLocation { /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * MemoryManager. This is package-private and is modified by MemoryManager. + * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, + * which lives in a different package. */ - int pageNumber = -1; + public int pageNumber = -1; public MemoryBlock(@Nullable Object obj, long offset, long length) { super(obj, offset); diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java deleted file mode 100644 index 97b2c93f0d..0000000000 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.memory; - -import java.util.*; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manages the memory allocated by an individual task. - * <p> - * Most of the complexity in this class deals with encoding of off-heap addresses into 64-bit longs. - * In off-heap mode, memory can be directly addressed with 64-bit longs. In on-heap mode, memory is - * addressed by the combination of a base Object reference and a 64-bit offset within that object. - * This is a problem when we want to store pointers to data structures inside of other structures, - * such as record pointers inside hashmaps or sorting buffers. Even if we decided to use 128 bits - * to address memory, we can't just store the address of the base object since it's not guaranteed - * to remain stable as the heap gets reorganized due to GC. - * <p> - * Instead, we use the following approach to encode record pointers in 64-bit longs: for off-heap - * mode, just store the raw address, and for on-heap mode use the upper 13 bits of the address to - * store a "page number" and the lower 51 bits to store an offset within this page. These page - * numbers are used to index into a "page table" array inside of the MemoryManager in order to - * retrieve the base object. - * <p> - * This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the - * maximum size of a long[] array, allowing us to address 8192 * 2^32 * 8 bytes, which is - * approximately 35 terabytes of memory. - */ -public class TaskMemoryManager { - - private final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class); - - /** The number of bits used to address the page table. */ - private static final int PAGE_NUMBER_BITS = 13; - - /** The number of bits used to encode offsets in data pages. */ - @VisibleForTesting - static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; // 51 - - /** The number of entries in the page table. */ - private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS; - - /** - * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is - * (1L << OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's maximum page - * size is limited by the maximum amount of data that can be stored in a long[] array, which is - * (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes. - */ - public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L; - - /** Bit mask for the lower 51 bits of a long. */ - private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL; - - /** Bit mask for the upper 13 bits of a long */ - private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS; - - /** - * Similar to an operating system's page table, this array maps page numbers into base object - * pointers, allowing us to translate between the hashtable's internal 64-bit address - * representation and the baseObject+offset representation which we use to support both in- and - * off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`. - * When using an in-heap allocator, the entries in this map will point to pages' base objects. - * Entries are added to this map as new data pages are allocated. - */ - private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE]; - - /** - * Bitmap for tracking free pages. - */ - private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE); - - /** - * Tracks memory allocated with {@link TaskMemoryManager#allocate(long)}, used to detect / clean - * up leaked memory. - */ - private final HashSet<MemoryBlock> allocatedNonPageMemory = new HashSet<MemoryBlock>(); - - private final ExecutorMemoryManager executorMemoryManager; - - /** - * Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods - * without doing any masking or lookups. Since this branching should be well-predicted by the JIT, - * this extra layer of indirection / abstraction hopefully shouldn't be too expensive. - */ - private final boolean inHeap; - - /** - * Construct a new MemoryManager. - */ - public TaskMemoryManager(ExecutorMemoryManager executorMemoryManager) { - this.inHeap = executorMemoryManager.inHeap; - this.executorMemoryManager = executorMemoryManager; - } - - /** - * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is - * intended for allocating large blocks of memory that will be shared between operators. - */ - public MemoryBlock allocatePage(long size) { - if (size > MAXIMUM_PAGE_SIZE_BYTES) { - throw new IllegalArgumentException( - "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes"); - } - - final int pageNumber; - synchronized (this) { - pageNumber = allocatedPages.nextClearBit(0); - if (pageNumber >= PAGE_TABLE_SIZE) { - throw new IllegalStateException( - "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages"); - } - allocatedPages.set(pageNumber); - } - final MemoryBlock page = executorMemoryManager.allocate(size); - page.pageNumber = pageNumber; - pageTable[pageNumber] = page; - if (logger.isTraceEnabled()) { - logger.trace("Allocate page number {} ({} bytes)", pageNumber, size); - } - return page; - } - - /** - * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}. - */ - public void freePage(MemoryBlock page) { - assert (page.pageNumber != -1) : - "Called freePage() on memory that wasn't allocated with allocatePage()"; - assert(allocatedPages.get(page.pageNumber)); - pageTable[page.pageNumber] = null; - synchronized (this) { - allocatedPages.clear(page.pageNumber); - } - if (logger.isTraceEnabled()) { - logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size()); - } - // Cannot access a page once it's freed. - executorMemoryManager.free(page); - } - - /** - * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed - * to be zeroed out (call `zero()` on the result if this is necessary). This method is intended - * to be used for allocating operators' internal data structures. For data pages that you want to - * exchange between operators, consider using {@link TaskMemoryManager#allocatePage(long)}, since - * that will enable intra-memory pointers (see - * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} and this class's - * top-level Javadoc for more details). - */ - public MemoryBlock allocate(long size) throws OutOfMemoryError { - assert(size > 0) : "Size must be positive, but got " + size; - final MemoryBlock memory = executorMemoryManager.allocate(size); - synchronized(allocatedNonPageMemory) { - allocatedNonPageMemory.add(memory); - } - return memory; - } - - /** - * Free memory allocated by {@link TaskMemoryManager#allocate(long)}. - */ - public void free(MemoryBlock memory) { - assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()"; - executorMemoryManager.free(memory); - synchronized(allocatedNonPageMemory) { - final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory); - assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!"; - } - } - - /** - * Given a memory page and offset within that page, encode this address into a 64-bit long. - * This address will remain valid as long as the corresponding page has not been freed. - * - * @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}. - * @param offsetInPage an offset in this page which incorporates the base offset. In other words, - * this should be the value that you would pass as the base offset into an - * UNSAFE call (e.g. page.baseOffset() + something). - * @return an encoded page address. - */ - public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { - if (!inHeap) { - // In off-heap mode, an offset is an absolute address that may require a full 64 bits to - // encode. Due to our page size limitation, though, we can convert this into an offset that's - // relative to the page's base offset; this relative offset will fit in 51 bits. - offsetInPage -= page.getBaseOffset(); - } - return encodePageNumberAndOffset(page.pageNumber, offsetInPage); - } - - @VisibleForTesting - public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) { - assert (pageNumber != -1) : "encodePageNumberAndOffset called with invalid page"; - return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS); - } - - @VisibleForTesting - public static int decodePageNumber(long pagePlusOffsetAddress) { - return (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> OFFSET_BITS); - } - - private static long decodeOffset(long pagePlusOffsetAddress) { - return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS); - } - - /** - * Get the page associated with an address encoded by - * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} - */ - public Object getPage(long pagePlusOffsetAddress) { - if (inHeap) { - final int pageNumber = decodePageNumber(pagePlusOffsetAddress); - assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); - final MemoryBlock page = pageTable[pageNumber]; - assert (page != null); - assert (page.getBaseObject() != null); - return page.getBaseObject(); - } else { - return null; - } - } - - /** - * Get the offset associated with an address encoded by - * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} - */ - public long getOffsetInPage(long pagePlusOffsetAddress) { - final long offsetInPage = decodeOffset(pagePlusOffsetAddress); - if (inHeap) { - return offsetInPage; - } else { - // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we - // converted the absolute address into a relative address. Here, we invert that operation: - final int pageNumber = decodePageNumber(pagePlusOffsetAddress); - assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); - final MemoryBlock page = pageTable[pageNumber]; - assert (page != null); - return page.getBaseOffset() + offsetInPage; - } - } - - /** - * Clean up all allocated memory and pages. Returns the number of bytes freed. A non-zero return - * value can be used to detect memory leaks. - */ - public long cleanUpAllAllocatedMemory() { - long freedBytes = 0; - for (MemoryBlock page : pageTable) { - if (page != null) { - freedBytes += page.size(); - freePage(page); - } - } - - synchronized (allocatedNonPageMemory) { - final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator(); - while (iter.hasNext()) { - final MemoryBlock memory = iter.next(); - freedBytes += memory.size(); - // We don't call free() here because that calls Set.remove, which would lead to a - // ConcurrentModificationException here. - executorMemoryManager.free(memory); - iter.remove(); - } - } - return freedBytes; - } -} |