aboutsummaryrefslogtreecommitdiff
path: root/unsafe
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-10-25 21:19:52 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-25 21:19:52 -0700
commit85e654c5ec87e666a8845bfd77185c1ea57b268a (patch)
tree2beadbc8fbb54369325970a4e2c7189506efad89 /unsafe
parent63accc79625d8a03d0624717af5e1d81b18a6da3 (diff)
downloadspark-85e654c5ec87e666a8845bfd77185c1ea57b268a.tar.gz
spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.tar.bz2
spark-85e654c5ec87e666a8845bfd77185c1ea57b268a.zip
[SPARK-10984] Simplify *MemoryManager class structure
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes: - MemoryManager - StaticMemoryManager - ExecutorMemoryManager - TaskMemoryManager - ShuffleMemoryManager This is fairly confusing. To simplify things, this patch consolidates several of these classes: - ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager. - TaskMemoryManager is moved into Spark Core. **Key changes and tasks**: - [x] Merge ExecutorMemoryManager into MemoryManager. - [x] Move pooling logic into Allocator. - [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`. - [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager. - [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager. - [x] Merge ShuffleMemoryManager into MemoryManager. - [x] Move code - [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.** - [x] Port ShuffleMemoryManagerSuite tests. - [x] Move classes from `unsafe` package to `memory` package. - [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction. - [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation - [x] AbstractBytesToBytesMapSuite - [x] UnsafeExternalSorterSuite - [x] UnsafeFixedWidthAggregationMapSuite - [x] UnsafeKVExternalSorterSuite **Compatiblity notes**: - This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task. Author: Josh Rosen <joshrosen@databricks.com> Closes #9127 from JoshRosen/SPARK-10984.
Diffstat (limited to 'unsafe')
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java111
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java51
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java5
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java286
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/memory/TaskMemoryManagerSuite.java64
5 files changed, 53 insertions, 464 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
deleted file mode 100644
index cbbe859462..0000000000
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import java.lang.ref.WeakReference;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * Manages memory for an executor. Individual operators / tasks allocate memory through
- * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager.
- */
-public class ExecutorMemoryManager {
-
- /**
- * Allocator, exposed for enabling untracked allocations of temporary data structures.
- */
- public final MemoryAllocator allocator;
-
- /**
- * Tracks whether memory will be allocated on the JVM heap or off-heap using sun.misc.Unsafe.
- */
- final boolean inHeap;
-
- @GuardedBy("this")
- private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
- new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>();
-
- private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
-
- /**
- * Construct a new ExecutorMemoryManager.
- *
- * @param allocator the allocator that will be used
- */
- public ExecutorMemoryManager(MemoryAllocator allocator) {
- this.inHeap = allocator instanceof HeapMemoryAllocator;
- this.allocator = allocator;
- }
-
- /**
- * Returns true if allocations of the given size should go through the pooling mechanism and
- * false otherwise.
- */
- private boolean shouldPool(long size) {
- // Very small allocations are less likely to benefit from pooling.
- // At some point, we should explore supporting pooling for off-heap memory, but for now we'll
- // ignore that case in the interest of simplicity.
- return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator;
- }
-
- /**
- * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
- * to be zeroed out (call `zero()` on the result if this is necessary).
- */
- MemoryBlock allocate(long size) throws OutOfMemoryError {
- if (shouldPool(size)) {
- synchronized (this) {
- final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
- if (pool != null) {
- while (!pool.isEmpty()) {
- final WeakReference<MemoryBlock> blockReference = pool.pop();
- final MemoryBlock memory = blockReference.get();
- if (memory != null) {
- assert (memory.size() == size);
- return memory;
- }
- }
- bufferPoolsBySize.remove(size);
- }
- }
- return allocator.allocate(size);
- } else {
- return allocator.allocate(size);
- }
- }
-
- void free(MemoryBlock memory) {
- final long size = memory.size();
- if (shouldPool(size)) {
- synchronized (this) {
- LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
- if (pool == null) {
- pool = new LinkedList<WeakReference<MemoryBlock>>();
- bufferPoolsBySize.put(size, pool);
- }
- pool.add(new WeakReference<MemoryBlock>(memory));
- }
- } else {
- allocator.free(memory);
- }
- }
-
-}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index 6722301df1..ebe90d9e63 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -17,22 +17,71 @@
package org.apache.spark.unsafe.memory;
+import javax.annotation.concurrent.GuardedBy;
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
/**
* A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
*/
public class HeapMemoryAllocator implements MemoryAllocator {
+ @GuardedBy("this")
+ private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
+ new HashMap<>();
+
+ private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+
+ /**
+ * Returns true if allocations of the given size should go through the pooling mechanism and
+ * false otherwise.
+ */
+ private boolean shouldPool(long size) {
+ // Very small allocations are less likely to benefit from pooling.
+ return size >= POOLING_THRESHOLD_BYTES;
+ }
+
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (size % 8 != 0) {
throw new IllegalArgumentException("Size " + size + " was not a multiple of 8");
}
+ if (shouldPool(size)) {
+ synchronized (this) {
+ final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ if (pool != null) {
+ while (!pool.isEmpty()) {
+ final WeakReference<MemoryBlock> blockReference = pool.pop();
+ final MemoryBlock memory = blockReference.get();
+ if (memory != null) {
+ assert (memory.size() == size);
+ return memory;
+ }
+ }
+ bufferPoolsBySize.remove(size);
+ }
+ }
+ }
long[] array = new long[(int) (size / 8)];
return MemoryBlock.fromLongArray(array);
}
@Override
public void free(MemoryBlock memory) {
- // Do nothing
+ final long size = memory.size();
+ if (shouldPool(size)) {
+ synchronized (this) {
+ LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+ if (pool == null) {
+ pool = new LinkedList<>();
+ bufferPoolsBySize.put(size, pool);
+ }
+ pool.add(new WeakReference<>(memory));
+ }
+ } else {
+ // Do nothing
+ }
}
}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
index dd75820834..e3e7947115 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
@@ -30,9 +30,10 @@ public class MemoryBlock extends MemoryLocation {
/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
- * MemoryManager. This is package-private and is modified by MemoryManager.
+ * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
+ * which lives in a different package.
*/
- int pageNumber = -1;
+ public int pageNumber = -1;
public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
deleted file mode 100644
index 97b2c93f0d..0000000000
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import java.util.*;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages the memory allocated by an individual task.
- * <p>
- * Most of the complexity in this class deals with encoding of off-heap addresses into 64-bit longs.
- * In off-heap mode, memory can be directly addressed with 64-bit longs. In on-heap mode, memory is
- * addressed by the combination of a base Object reference and a 64-bit offset within that object.
- * This is a problem when we want to store pointers to data structures inside of other structures,
- * such as record pointers inside hashmaps or sorting buffers. Even if we decided to use 128 bits
- * to address memory, we can't just store the address of the base object since it's not guaranteed
- * to remain stable as the heap gets reorganized due to GC.
- * <p>
- * Instead, we use the following approach to encode record pointers in 64-bit longs: for off-heap
- * mode, just store the raw address, and for on-heap mode use the upper 13 bits of the address to
- * store a "page number" and the lower 51 bits to store an offset within this page. These page
- * numbers are used to index into a "page table" array inside of the MemoryManager in order to
- * retrieve the base object.
- * <p>
- * This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the
- * maximum size of a long[] array, allowing us to address 8192 * 2^32 * 8 bytes, which is
- * approximately 35 terabytes of memory.
- */
-public class TaskMemoryManager {
-
- private final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class);
-
- /** The number of bits used to address the page table. */
- private static final int PAGE_NUMBER_BITS = 13;
-
- /** The number of bits used to encode offsets in data pages. */
- @VisibleForTesting
- static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; // 51
-
- /** The number of entries in the page table. */
- private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
-
- /**
- * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
- * (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's maximum page
- * size is limited by the maximum amount of data that can be stored in a long[] array, which is
- * (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes.
- */
- public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
-
- /** Bit mask for the lower 51 bits of a long. */
- private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
-
- /** Bit mask for the upper 13 bits of a long */
- private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
-
- /**
- * Similar to an operating system's page table, this array maps page numbers into base object
- * pointers, allowing us to translate between the hashtable's internal 64-bit address
- * representation and the baseObject+offset representation which we use to support both in- and
- * off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
- * When using an in-heap allocator, the entries in this map will point to pages' base objects.
- * Entries are added to this map as new data pages are allocated.
- */
- private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
-
- /**
- * Bitmap for tracking free pages.
- */
- private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
-
- /**
- * Tracks memory allocated with {@link TaskMemoryManager#allocate(long)}, used to detect / clean
- * up leaked memory.
- */
- private final HashSet<MemoryBlock> allocatedNonPageMemory = new HashSet<MemoryBlock>();
-
- private final ExecutorMemoryManager executorMemoryManager;
-
- /**
- * Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
- * without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
- * this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
- */
- private final boolean inHeap;
-
- /**
- * Construct a new MemoryManager.
- */
- public TaskMemoryManager(ExecutorMemoryManager executorMemoryManager) {
- this.inHeap = executorMemoryManager.inHeap;
- this.executorMemoryManager = executorMemoryManager;
- }
-
- /**
- * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
- * intended for allocating large blocks of memory that will be shared between operators.
- */
- public MemoryBlock allocatePage(long size) {
- if (size > MAXIMUM_PAGE_SIZE_BYTES) {
- throw new IllegalArgumentException(
- "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
- }
-
- final int pageNumber;
- synchronized (this) {
- pageNumber = allocatedPages.nextClearBit(0);
- if (pageNumber >= PAGE_TABLE_SIZE) {
- throw new IllegalStateException(
- "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
- }
- allocatedPages.set(pageNumber);
- }
- final MemoryBlock page = executorMemoryManager.allocate(size);
- page.pageNumber = pageNumber;
- pageTable[pageNumber] = page;
- if (logger.isTraceEnabled()) {
- logger.trace("Allocate page number {} ({} bytes)", pageNumber, size);
- }
- return page;
- }
-
- /**
- * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
- */
- public void freePage(MemoryBlock page) {
- assert (page.pageNumber != -1) :
- "Called freePage() on memory that wasn't allocated with allocatePage()";
- assert(allocatedPages.get(page.pageNumber));
- pageTable[page.pageNumber] = null;
- synchronized (this) {
- allocatedPages.clear(page.pageNumber);
- }
- if (logger.isTraceEnabled()) {
- logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
- }
- // Cannot access a page once it's freed.
- executorMemoryManager.free(page);
- }
-
- /**
- * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
- * to be zeroed out (call `zero()` on the result if this is necessary). This method is intended
- * to be used for allocating operators' internal data structures. For data pages that you want to
- * exchange between operators, consider using {@link TaskMemoryManager#allocatePage(long)}, since
- * that will enable intra-memory pointers (see
- * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} and this class's
- * top-level Javadoc for more details).
- */
- public MemoryBlock allocate(long size) throws OutOfMemoryError {
- assert(size > 0) : "Size must be positive, but got " + size;
- final MemoryBlock memory = executorMemoryManager.allocate(size);
- synchronized(allocatedNonPageMemory) {
- allocatedNonPageMemory.add(memory);
- }
- return memory;
- }
-
- /**
- * Free memory allocated by {@link TaskMemoryManager#allocate(long)}.
- */
- public void free(MemoryBlock memory) {
- assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()";
- executorMemoryManager.free(memory);
- synchronized(allocatedNonPageMemory) {
- final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
- assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
- }
- }
-
- /**
- * Given a memory page and offset within that page, encode this address into a 64-bit long.
- * This address will remain valid as long as the corresponding page has not been freed.
- *
- * @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}.
- * @param offsetInPage an offset in this page which incorporates the base offset. In other words,
- * this should be the value that you would pass as the base offset into an
- * UNSAFE call (e.g. page.baseOffset() + something).
- * @return an encoded page address.
- */
- public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
- if (!inHeap) {
- // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
- // encode. Due to our page size limitation, though, we can convert this into an offset that's
- // relative to the page's base offset; this relative offset will fit in 51 bits.
- offsetInPage -= page.getBaseOffset();
- }
- return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
- }
-
- @VisibleForTesting
- public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
- assert (pageNumber != -1) : "encodePageNumberAndOffset called with invalid page";
- return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
- }
-
- @VisibleForTesting
- public static int decodePageNumber(long pagePlusOffsetAddress) {
- return (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> OFFSET_BITS);
- }
-
- private static long decodeOffset(long pagePlusOffsetAddress) {
- return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
- }
-
- /**
- * Get the page associated with an address encoded by
- * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
- */
- public Object getPage(long pagePlusOffsetAddress) {
- if (inHeap) {
- final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
- assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
- final MemoryBlock page = pageTable[pageNumber];
- assert (page != null);
- assert (page.getBaseObject() != null);
- return page.getBaseObject();
- } else {
- return null;
- }
- }
-
- /**
- * Get the offset associated with an address encoded by
- * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
- */
- public long getOffsetInPage(long pagePlusOffsetAddress) {
- final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
- if (inHeap) {
- return offsetInPage;
- } else {
- // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
- // converted the absolute address into a relative address. Here, we invert that operation:
- final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
- assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
- final MemoryBlock page = pageTable[pageNumber];
- assert (page != null);
- return page.getBaseOffset() + offsetInPage;
- }
- }
-
- /**
- * Clean up all allocated memory and pages. Returns the number of bytes freed. A non-zero return
- * value can be used to detect memory leaks.
- */
- public long cleanUpAllAllocatedMemory() {
- long freedBytes = 0;
- for (MemoryBlock page : pageTable) {
- if (page != null) {
- freedBytes += page.size();
- freePage(page);
- }
- }
-
- synchronized (allocatedNonPageMemory) {
- final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
- while (iter.hasNext()) {
- final MemoryBlock memory = iter.next();
- freedBytes += memory.size();
- // We don't call free() here because that calls Set.remove, which would lead to a
- // ConcurrentModificationException here.
- executorMemoryManager.free(memory);
- iter.remove();
- }
- }
- return freedBytes;
- }
-}
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/memory/TaskMemoryManagerSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/memory/TaskMemoryManagerSuite.java
deleted file mode 100644
index 06fb081183..0000000000
--- a/unsafe/src/test/java/org/apache/spark/unsafe/memory/TaskMemoryManagerSuite.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TaskMemoryManagerSuite {
-
- @Test
- public void leakedNonPageMemoryIsDetected() {
- final TaskMemoryManager manager =
- new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
- manager.allocate(1024); // leak memory
- Assert.assertEquals(1024, manager.cleanUpAllAllocatedMemory());
- }
-
- @Test
- public void leakedPageMemoryIsDetected() {
- final TaskMemoryManager manager =
- new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
- manager.allocatePage(4096); // leak memory
- Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
- }
-
- @Test
- public void encodePageNumberAndOffsetOffHeap() {
- final TaskMemoryManager manager =
- new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
- final MemoryBlock dataPage = manager.allocatePage(256);
- // In off-heap mode, an offset is an absolute address that may require more than 51 bits to
- // encode. This test exercises that corner-case:
- final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10);
- final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, offset);
- Assert.assertEquals(null, manager.getPage(encodedAddress));
- Assert.assertEquals(offset, manager.getOffsetInPage(encodedAddress));
- }
-
- @Test
- public void encodePageNumberAndOffsetOnHeap() {
- final TaskMemoryManager manager =
- new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
- final MemoryBlock dataPage = manager.allocatePage(256);
- final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
- Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
- Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress));
- }
-
-}