aboutsummaryrefslogtreecommitdiff
path: root/unsafe/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-05-20 16:42:49 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-20 16:42:49 -0700
commitf2faa7af30662e3bdf15780f8719c71108f8e30b (patch)
tree0033bb03689beaf77a13f2e9062c21997f7a1e2f /unsafe/src/test
parent7956dd7ab03e1542d89dd94c043f1e5131684199 (diff)
downloadspark-f2faa7af30662e3bdf15780f8719c71108f8e30b.tar.gz
spark-f2faa7af30662e3bdf15780f8719c71108f8e30b.tar.bz2
spark-f2faa7af30662e3bdf15780f8719c71108f8e30b.zip
[SPARK-7251] Perform sequential scan when iterating over BytesToBytesMap
This patch modifies `BytesToBytesMap.iterator()` to iterate through records in the order that they appear in the data pages rather than iterating through the hashtable pointer arrays. This results in fewer random memory accesses, significantly improving performance for scan-and-copy operations. This is possible because our data pages are laid out as sequences of `[keyLength][data][valueLength][data]` entries. In order to mark the end of a partially-filled data page, we write `-1` as a special end-of-page length (BytesToByesMap supports empty/zero-length keys and values, which is why we had to use a negative length). This patch incorporates / closes #5836. Author: Josh Rosen <joshrosen@databricks.com> Closes #6159 from JoshRosen/SPARK-7251 and squashes the following commits: 05bd90a [Josh Rosen] Compare capacity, not size, to MAX_CAPACITY 2a20d71 [Josh Rosen] Fix maximum BytesToBytesMap capacity bc4854b [Josh Rosen] Guard against overflow when growing BytesToBytesMap f5feadf [Josh Rosen] Add test for iterating over an empty map 273b842 [Josh Rosen] [SPARK-7251] Perform sequential scan when iterating over entries in BytesToBytesMap
Diffstat (limited to 'unsafe/src/test')
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java165
1 files changed, 149 insertions, 16 deletions
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 7a5c0622d1..81315f7c94 100644
--- a/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -25,24 +25,40 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.Mockito.*;
import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.memory.*;
import org.apache.spark.unsafe.PlatformDependent;
import static org.apache.spark.unsafe.PlatformDependent.BYTE_ARRAY_OFFSET;
-import org.apache.spark.unsafe.memory.ExecutorMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryAllocator;
-import org.apache.spark.unsafe.memory.MemoryLocation;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import static org.apache.spark.unsafe.PlatformDependent.LONG_ARRAY_OFFSET;
+
public abstract class AbstractBytesToBytesMapSuite {
private final Random rand = new Random(42);
private TaskMemoryManager memoryManager;
+ private TaskMemoryManager sizeLimitedMemoryManager;
@Before
public void setup() {
memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
+ // Mocked memory manager for tests that check the maximum array size, since actually allocating
+ // such large arrays will cause us to run out of memory in our tests.
+ sizeLimitedMemoryManager = spy(memoryManager);
+ when(sizeLimitedMemoryManager.allocate(geq(1L << 20))).thenAnswer(new Answer<MemoryBlock>() {
+ @Override
+ public MemoryBlock answer(InvocationOnMock invocation) throws Throwable {
+ if (((Long) invocation.getArguments()[0] / 8) > Integer.MAX_VALUE) {
+ throw new OutOfMemoryError("Requested array size exceeds VM limit");
+ }
+ return memoryManager.allocate(1L << 20);
+ }
+ });
}
@After
@@ -101,6 +117,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final int keyLengthInBytes = keyLengthInWords * 8;
final byte[] key = getRandomByteArray(keyLengthInWords);
Assert.assertFalse(map.lookup(key, BYTE_ARRAY_OFFSET, keyLengthInBytes).isDefined());
+ Assert.assertFalse(map.iterator().hasNext());
} finally {
map.free();
}
@@ -159,7 +176,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void iteratorTest() throws Exception {
- final int size = 128;
+ final int size = 4096;
BytesToBytesMap map = new BytesToBytesMap(memoryManager, size / 2);
try {
for (long i = 0; i < size; i++) {
@@ -167,14 +184,26 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(value, PlatformDependent.LONG_ARRAY_OFFSET, 8);
Assert.assertFalse(loc.isDefined());
- loc.putNewKey(
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8,
- value,
- PlatformDependent.LONG_ARRAY_OFFSET,
- 8
- );
+ // Ensure that we store some zero-length keys
+ if (i % 5 == 0) {
+ loc.putNewKey(
+ null,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 0,
+ value,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 8
+ );
+ } else {
+ loc.putNewKey(
+ value,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 8,
+ value,
+ PlatformDependent.LONG_ARRAY_OFFSET,
+ 8
+ );
+ }
}
final java.util.BitSet valuesSeen = new java.util.BitSet(size);
final Iterator<BytesToBytesMap.Location> iter = map.iterator();
@@ -183,11 +212,16 @@ public abstract class AbstractBytesToBytesMapSuite {
Assert.assertTrue(loc.isDefined());
final MemoryLocation keyAddress = loc.getKeyAddress();
final MemoryLocation valueAddress = loc.getValueAddress();
- final long key = PlatformDependent.UNSAFE.getLong(
- keyAddress.getBaseObject(), keyAddress.getBaseOffset());
final long value = PlatformDependent.UNSAFE.getLong(
valueAddress.getBaseObject(), valueAddress.getBaseOffset());
- Assert.assertEquals(key, value);
+ final long keyLength = loc.getKeyLength();
+ if (keyLength == 0) {
+ Assert.assertTrue("value " + value + " was not divisible by 5", value % 5 == 0);
+ } else {
+ final long key = PlatformDependent.UNSAFE.getLong(
+ keyAddress.getBaseObject(), keyAddress.getBaseOffset());
+ Assert.assertEquals(value, key);
+ }
valuesSeen.set((int) value);
}
Assert.assertEquals(size, valuesSeen.cardinality());
@@ -197,6 +231,74 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
+ public void iteratingOverDataPagesWithWastedSpace() throws Exception {
+ final int NUM_ENTRIES = 1000 * 1000;
+ final int KEY_LENGTH = 16;
+ final int VALUE_LENGTH = 40;
+ final BytesToBytesMap map = new BytesToBytesMap(memoryManager, NUM_ENTRIES);
+ // Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
+ // pages won't be evenly-divisible by records of this size, which will cause us to waste some
+ // space at the end of the page. This is necessary in order for us to take the end-of-record
+ // handling branch in iterator().
+ try {
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes
+ final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes
+ final BytesToBytesMap.Location loc = map.lookup(
+ key,
+ LONG_ARRAY_OFFSET,
+ KEY_LENGTH
+ );
+ Assert.assertFalse(loc.isDefined());
+ loc.putNewKey(
+ key,
+ LONG_ARRAY_OFFSET,
+ KEY_LENGTH,
+ value,
+ LONG_ARRAY_OFFSET,
+ VALUE_LENGTH
+ );
+ }
+ Assert.assertEquals(2, map.getNumDataPages());
+
+ final java.util.BitSet valuesSeen = new java.util.BitSet(NUM_ENTRIES);
+ final Iterator<BytesToBytesMap.Location> iter = map.iterator();
+ final long key[] = new long[KEY_LENGTH / 8];
+ final long value[] = new long[VALUE_LENGTH / 8];
+ while (iter.hasNext()) {
+ final BytesToBytesMap.Location loc = iter.next();
+ Assert.assertTrue(loc.isDefined());
+ Assert.assertEquals(KEY_LENGTH, loc.getKeyLength());
+ Assert.assertEquals(VALUE_LENGTH, loc.getValueLength());
+ PlatformDependent.copyMemory(
+ loc.getKeyAddress().getBaseObject(),
+ loc.getKeyAddress().getBaseOffset(),
+ key,
+ LONG_ARRAY_OFFSET,
+ KEY_LENGTH
+ );
+ PlatformDependent.copyMemory(
+ loc.getValueAddress().getBaseObject(),
+ loc.getValueAddress().getBaseOffset(),
+ value,
+ LONG_ARRAY_OFFSET,
+ VALUE_LENGTH
+ );
+ for (long j : key) {
+ Assert.assertEquals(key[0], j);
+ }
+ for (long j : value) {
+ Assert.assertEquals(key[0], j);
+ }
+ valuesSeen.set((int) key[0]);
+ }
+ Assert.assertEquals(NUM_ENTRIES, valuesSeen.cardinality());
+ } finally {
+ map.free();
+ }
+ }
+
+ @Test
public void randomizedStressTest() {
final int size = 65536;
// Java arrays' hashCodes() aren't based on the arrays' contents, so we need to wrap arrays
@@ -247,4 +349,35 @@ public abstract class AbstractBytesToBytesMapSuite {
map.free();
}
}
+
+ @Test
+ public void initialCapacityBoundsChecking() {
+ try {
+ new BytesToBytesMap(sizeLimitedMemoryManager, 0);
+ Assert.fail("Expected IllegalArgumentException to be thrown");
+ } catch (IllegalArgumentException e) {
+ // expected exception
+ }
+
+ try {
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY + 1);
+ Assert.fail("Expected IllegalArgumentException to be thrown");
+ } catch (IllegalArgumentException e) {
+ // expected exception
+ }
+
+ // Can allocate _at_ the max capacity
+ BytesToBytesMap map =
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY);
+ map.free();
+ }
+
+ @Test
+ public void resizingLargeMap() {
+ // As long as a map's capacity is below the max, we should be able to resize up to the max
+ BytesToBytesMap map =
+ new BytesToBytesMap(sizeLimitedMemoryManager, BytesToBytesMap.MAX_CAPACITY - 64);
+ map.growAndRehash();
+ map.free();
+ }
}