aboutsummaryrefslogtreecommitdiff
path: root/unsafe/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'unsafe/src/test')
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java165
1 files changed, 149 insertions, 16 deletions
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 7a5c0622d1..81315f7c94 100644
--- a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -25,24 +25,40 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.Mockito.*;
import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.memory.*;
import org.apache.spark.unsafe.PlatformDependent;
import static org.apache.spark.unsafe.PlatformDependent.BYTE_ARRAY_OFFSET;
-import org.apache.spark.unsafe.memory.ExecutorMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryAllocator;
-import org.apache.spark.unsafe.memory.MemoryLocation;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import static org.apache.spark.unsafe.PlatformDependent.LONG_ARRAY_OFFSET;
+
public abstract class AbstractBytesToBytesMapSuite {
private final Random rand = new Random(42);
private TaskMemoryManager memoryManager;
+ private TaskMemoryManager sizeLimitedMemoryManager;
@Before
public void setup() {
memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
+ // Mocked memory manager for tests that check the maximum array size, since actually allocating
+ // such large arrays will cause us to run out of memory in our tests.
+ sizeLimitedMemoryManager = spy(memoryManager);
+ when(sizeLimitedMemoryManager.allocate(geq(1L << 20))).thenAnswer(new Answer<MemoryBlock>() {
+ @Override
+ public MemoryBlock answer(InvocationOnMock invocation) throws Throwable {
+ if (((Long) invocation.getArguments()[0] / 8) > Integer.MAX_VALUE) {
+ throw new OutOfMemoryError("Requested array size exceeds VM limit");
+ }
+ return memoryManager.allocate(1L << 20);
+ }
+ });
}
@After
@@ -101,6 +117,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final int keyLengthInBytes = keyLengthInWords * 8;
final byte[] key = getRandomByteArray(keyLengthInWords);
Assert.assertFalse(map.lookup(key, BYTE_ARRAY_OFFSET, keyLengthInBytes).isDefined());
+ Assert.assertFalse(map.iterator().hasNext());
} finally {
map.free();
}
@@ -159,7 +176,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void iteratorTest() throws Exception {
- final int size = 128;
+ final int size = 4096;
BytesToBytesMap map = new BytesToBytesMap(memoryManager, size / 2);
try {
for (long i = 0; i < size; i++) {
@@ -167,14 +184,26 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(value, PlatformDependent.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
- loc.putNewKey(
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8,
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8
- );
+ // Ensure that we store some zero-length keys
+ if (i % 5 == 0) {
+ loc.putNewKey(
+ null,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 0,
+ value,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 8
+ );
+ } else {
+ loc.putNewKey(
+ value,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 8,
+ value,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 8
+ );
+ }
}
final java.util.BitSet valuesSeen = new java.util.BitSet(size);
final Iterator<BytesToBytesMap.Location> iter = map.iterator();
@@ -183,11 +212,16 @@ public abstract class AbstractBytesToBytesMapSuite {
Assert.assertTrue(loc.isDefined());
final MemoryLocation keyAddress = loc.getKeyAddress();
final MemoryLocation valueAddress = loc.getValueAddress();
- final long key = PlatformDependent.UNSAFE.getLong(
- keyAddress.getBaseObject(), keyAddress.getBaseOffset());
final long value = PlatformDependent.UNSAFE.getLong(
valueAddress.getBaseObject(), valueAddress.getBaseOffset());
- Assert.assertEquals(key, value);
+ final long keyLength = loc.getKeyLength();
+ if (keyLength == 0) {
+ Assert.assertTrue("value " + value + " was not divisible by 5", value % 5 == 0);
+ } else {
+ final long key = PlatformDependent.UNSAFE.getLong(
+ keyAddress.getBaseObject(), keyAddress.getBaseOffset());
+ Assert.assertEquals(value, key);
+ }
valuesSeen.set((int) value);
}
Assert.assertEquals(size, valuesSeen.cardinality());
@@ -197,6 +231,74 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
+ public void iteratingOverDataPagesWithWastedSpace() throws Exception {
+ final int NUM_ENTRIES = 1000 * 1000;
+ final int KEY_LENGTH = 16;
+ final int VALUE_LENGTH = 40;
+ final BytesToBytesMap map = new BytesToBytesMap(memoryManager, NUM_ENTRIES);
+ // Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
+ // pages won't be evenly-divisible by records of this size, which will cause us to waste some
+ // space at the end of the page. This is necessary in order for us to take the end-of-record
+ // handling branch in iterator().
+ try {
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes
+ final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes
+ final BytesToBytesMap.Location loc = map.lookup(
+ key,
+ LONG_ARRAY_OFFSET,
+ KEY_LENGTH
+ );
+ Assert.assertFalse(loc.isDefined());
+ loc.putNewKey(
+ key,
+ LONG_ARRAY_OFFSET,
+ KEY_LENGTH,
+ value,
+ LONG_ARRAY_OFFSET,
+ VALUE_LENGTH
+ );
+ }
+ Assert.assertEquals(2, map.getNumDataPages());
+
+ final java.util.BitSet valuesSeen = new java.util.BitSet(NUM_ENTRIES);
+ final Iterator<BytesToBytesMap.Location> iter = map.iterator();
+ final long key[] = new long[KEY_LENGTH / 8];
+ final long value[] = new long[VALUE_LENGTH / 8];
+ while (iter.hasNext()) {
+ final BytesToBytesMap.Location loc = iter.next();
+ Assert.assertTrue(loc.isDefined());
+ Assert.assertEquals(KEY_LENGTH, loc.getKeyLength());
+ Assert.assertEquals(VALUE_LENGTH, loc.getValueLength());
+ PlatformDependent.copyMemory(
+ loc.getKeyAddress().getBaseObject(),
+ loc.getKeyAddress().getBaseOffset(),
+ key,
+ LONG_ARRAY_OFFSET,
+ KEY_LENGTH
+ );
+ PlatformDependent.copyMemory(
+ loc.getValueAddress().getBaseObject(),
+ loc.getValueAddress().getBaseOffset(),
+ value,
+ LONG_ARRAY_OFFSET,
+ VALUE_LENGTH
+ );
+ for (long j : key) {
+ Assert.assertEquals(key[0], j);
+ }
+ for (long j : value) {
+ Assert.assertEquals(key[0], j);
+ }
+ valuesSeen.set((int) key[0]);
+ }
+ Assert.assertEquals(NUM_ENTRIES, valuesSeen.cardinality());
+ } finally {
+ map.free();
+ }
+ }
+
+ @Test
public void randomizedStressTest() {
final int size = 65536;
// Java arrays' hashCodes() aren't based on the arrays' contents, so we need to wrap arrays
@@ -247,4 +349,35 @@ public abstract class AbstractBytesToBytesMapSuite {
map.free();
}
}
+
+ @Test
+ public void initialCapacityBoundsChecking() {
+ try {
+ new BytesToBytesMap(sizeLimitedMemoryManager, 0);
+ Assert.fail("Expected IllegalArgumentException to be thrown");
+ } catch (IllegalArgumentException e) {
+ // expected exception
+ }
+
+ try {
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY + 1);
+ Assert.fail("Expected IllegalArgumentException to be thrown");
+ } catch (IllegalArgumentException e) {
+ // expected exception
+ }
+
+ // Can allocate _at_ the max capacity
+ BytesToBytesMap map =
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY);
+ map.free();
+ }
+
+ @Test
+ public void resizingLargeMap() {
+ // As long as a map's capacity is below the max, we should be able to resize up to the max
+ BytesToBytesMap map =
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY - 64);
+ map.growAndRehash();
+ map.free();
+ }
}