aboutsummaryrefslogtreecommitdiff
path: root/unsafe
diff options
context:
space:
mode:
Diffstat (limited to 'unsafe')
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java43
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java13
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java22
3 files changed, 48 insertions, 30 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index d0bde69cc1..198e0684f3 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -75,12 +75,6 @@ public final class BytesToBytesMap {
private long pageCursor = 0;
/**
- * The size of the data pages that hold key and value data. Map entries cannot span multiple
- * pages, so this limits the maximum entry size.
- */
- private static final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
-
- /**
* The maximum number of keys that BytesToBytesMap supports. The hash table has to be
* power-of-2-sized and its backing Java array can contain at most (1 << 30) elements, since
* that's the largest power-of-2 that's less than Integer.MAX_VALUE. We need two long array
@@ -118,6 +112,12 @@ public final class BytesToBytesMap {
private final double loadFactor;
/**
+ * The size of the data pages that hold key and value data. Map entries cannot span multiple
+ * pages, so this limits the maximum entry size.
+ */
+ private final long pageSizeBytes;
+
+ /**
* Number of keys defined in the map.
*/
private int size;
@@ -153,10 +153,12 @@ public final class BytesToBytesMap {
TaskMemoryManager memoryManager,
int initialCapacity,
double loadFactor,
+ long pageSizeBytes,
boolean enablePerfMetrics) {
this.memoryManager = memoryManager;
this.loadFactor = loadFactor;
this.loc = new Location();
+ this.pageSizeBytes = pageSizeBytes;
this.enablePerfMetrics = enablePerfMetrics;
if (initialCapacity <= 0) {
throw new IllegalArgumentException("Initial capacity must be greater than 0");
@@ -165,18 +167,26 @@ public final class BytesToBytesMap {
throw new IllegalArgumentException(
"Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY);
}
+ if (pageSizeBytes > TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES) {
+ throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " +
+ TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
+ }
allocate(initialCapacity);
}
- public BytesToBytesMap(TaskMemoryManager memoryManager, int initialCapacity) {
- this(memoryManager, initialCapacity, 0.70, false);
+ public BytesToBytesMap(
+ TaskMemoryManager memoryManager,
+ int initialCapacity,
+ long pageSizeBytes) {
+ this(memoryManager, initialCapacity, 0.70, pageSizeBytes, false);
}
public BytesToBytesMap(
TaskMemoryManager memoryManager,
int initialCapacity,
+ long pageSizeBytes,
boolean enablePerfMetrics) {
- this(memoryManager, initialCapacity, 0.70, enablePerfMetrics);
+ this(memoryManager, initialCapacity, 0.70, pageSizeBytes, enablePerfMetrics);
}
/**
@@ -443,20 +453,20 @@ public final class BytesToBytesMap {
// must be stored in the same memory page.
// (8 byte key length) (key) (8 byte value length) (value)
final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
- assert (requiredSize <= PAGE_SIZE_BYTES - 8); // Reserve 8 bytes for the end-of-page marker.
+ assert (requiredSize <= pageSizeBytes - 8); // Reserve 8 bytes for the end-of-page marker.
size++;
bitset.set(pos);
// If there's not enough space in the current page, allocate a new page (8 bytes are reserved
// for the end-of-page marker).
- if (currentDataPage == null || PAGE_SIZE_BYTES - 8 - pageCursor < requiredSize) {
+ if (currentDataPage == null || pageSizeBytes - 8 - pageCursor < requiredSize) {
if (currentDataPage != null) {
// There wasn't enough space in the current page, so write an end-of-page marker:
final Object pageBaseObject = currentDataPage.getBaseObject();
final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
}
- MemoryBlock newPage = memoryManager.allocatePage(PAGE_SIZE_BYTES);
+ MemoryBlock newPage = memoryManager.allocatePage(pageSizeBytes);
dataPages.add(newPage);
pageCursor = 0;
currentDataPage = newPage;
@@ -538,10 +548,11 @@ public final class BytesToBytesMap {
/** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
public long getTotalMemoryConsumption() {
- return (
- dataPages.size() * PAGE_SIZE_BYTES +
- bitset.memoryBlock().size() +
- longArray.memoryBlock().size());
+ long totalDataPagesSize = 0L;
+ for (MemoryBlock dataPage : dataPages) {
+ totalDataPagesSize += dataPage.size();
+ }
+ return totalDataPagesSize + bitset.memoryBlock().size() + longArray.memoryBlock().size();
}
/**
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
index 10881969db..dd70df3b1f 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
@@ -58,8 +58,13 @@ public class TaskMemoryManager {
/** The number of entries in the page table. */
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
- /** Maximum supported data page size */
- private static final long MAXIMUM_PAGE_SIZE = (1L << OFFSET_BITS);
+ /**
+ * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
+ * (1L << OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's maximum page
+ * size is limited by the maximum amount of data that can be stored in a long[] array, which is
+ * (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes.
+ */
+ public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
@@ -110,9 +115,9 @@ public class TaskMemoryManager {
* intended for allocating large blocks of memory that will be shared between operators.
*/
public MemoryBlock allocatePage(long size) {
- if (size > MAXIMUM_PAGE_SIZE) {
+ if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException(
- "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE + " bytes");
+ "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
final int pageNumber;
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index dae47e4bab..0be94ad371 100644
--- a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -43,6 +43,7 @@ public abstract class AbstractBytesToBytesMapSuite {
private TaskMemoryManager memoryManager;
private TaskMemoryManager sizeLimitedMemoryManager;
+ private final long PAGE_SIZE_BYTES = 1L << 26; // 64 megabytes
@Before
public void setup() {
@@ -110,7 +111,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void emptyMap() {
- BytesToBytesMap map = new BytesToBytesMap(memoryManager, 64);
+ BytesToBytesMap map = new BytesToBytesMap(memoryManager, 64, PAGE_SIZE_BYTES);
try {
Assert.assertEquals(0, map.size());
final int keyLengthInWords = 10;
@@ -125,7 +126,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void setAndRetrieveAKey() {
- BytesToBytesMap map = new BytesToBytesMap(memoryManager, 64);
+ BytesToBytesMap map = new BytesToBytesMap(memoryManager, 64, PAGE_SIZE_BYTES);
final int recordLengthWords = 10;
final int recordLengthBytes = recordLengthWords * 8;
final byte[] keyData = getRandomByteArray(recordLengthWords);
@@ -177,7 +178,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void iteratorTest() throws Exception {
final int size = 4096;
- BytesToBytesMap map = new BytesToBytesMap(memoryManager, size / 2);
+ BytesToBytesMap map = new BytesToBytesMap(memoryManager, size / 2, PAGE_SIZE_BYTES);
try {
for (long i = 0; i < size; i++) {
final long[] value = new long[] { i };
@@ -235,7 +236,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final int NUM_ENTRIES = 1000 * 1000;
final int KEY_LENGTH = 16;
final int VALUE_LENGTH = 40;
- final BytesToBytesMap map = new BytesToBytesMap(memoryManager, NUM_ENTRIES);
+ final BytesToBytesMap map = new BytesToBytesMap(memoryManager, NUM_ENTRIES, PAGE_SIZE_BYTES);
// Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
// pages won't be evenly-divisible by records of this size, which will cause us to waste some
// space at the end of the page. This is necessary in order for us to take the end-of-record
@@ -304,7 +305,7 @@ public abstract class AbstractBytesToBytesMapSuite {
// Java arrays' hashCodes() aren't based on the arrays' contents, so we need to wrap arrays
// into ByteBuffers in order to use them as keys here.
final Map<ByteBuffer, byte[]> expected = new HashMap<ByteBuffer, byte[]>();
- final BytesToBytesMap map = new BytesToBytesMap(memoryManager, size);
+ final BytesToBytesMap map = new BytesToBytesMap(memoryManager, size, PAGE_SIZE_BYTES);
try {
// Fill the map to 90% full so that we can trigger probing
@@ -353,14 +354,15 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void initialCapacityBoundsChecking() {
try {
- new BytesToBytesMap(sizeLimitedMemoryManager, 0);
+ new BytesToBytesMap(sizeLimitedMemoryManager, 0, PAGE_SIZE_BYTES);
Assert.fail("Expected IllegalArgumentException to be thrown");
} catch (IllegalArgumentException e) {
// expected exception
}
try {
- new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY + 1);
+ new BytesToBytesMap(
+ sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY + 1, PAGE_SIZE_BYTES);
Assert.fail("Expected IllegalArgumentException to be thrown");
} catch (IllegalArgumentException e) {
// expected exception
@@ -368,15 +370,15 @@ public abstract class AbstractBytesToBytesMapSuite {
// Can allocate _at_ the max capacity
BytesToBytesMap map =
- new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY);
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY, PAGE_SIZE_BYTES);
map.free();
}
@Test
public void resizingLargeMap() {
// As long as a map's capacity is below the max, we should be able to resize up to the max
- BytesToBytesMap map =
- new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY - 64);
+ BytesToBytesMap map = new BytesToBytesMap(
+ sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY - 64, PAGE_SIZE_BYTES);
map.growAndRehash();
map.free();
}