aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-03-28 13:07:32 -0700
committerDavies Liu <davies.liu@gmail.com>2016-03-28 13:07:32 -0700
commitd7b58f1461f71ee3c028360eef0ffedd17d6a076 (patch)
tree58ddca8bb29534ecb77446e6706f33d885e01bd4 /core/src/test/java
parent600c0b69cab4767e8e5a6f4284777d8b9d4bd40e (diff)
downloadspark-d7b58f1461f71ee3c028360eef0ffedd17d6a076.tar.gz
spark-d7b58f1461f71ee3c028360eef0ffedd17d6a076.tar.bz2
spark-d7b58f1461f71ee3c028360eef0ffedd17d6a076.zip
[SPARK-14052] [SQL] build a BytesToBytesMap directly in HashedRelation
## What changes were proposed in this pull request? Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency. In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key. ## How was this patch tested? Existing tests. Added benchmark for broadcast hash join with duplicated keys. Author: Davies Liu <davies@databricks.com> Closes #11870 from davies/map2.
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java64
1 files changed, 51 insertions, 13 deletions
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 449fb45c30..84b82f5a47 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -182,7 +182,7 @@ public abstract class AbstractBytesToBytesMapSuite {
public void emptyMap() {
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 64, PAGE_SIZE_BYTES);
try {
- Assert.assertEquals(0, map.numElements());
+ Assert.assertEquals(0, map.numKeys());
final int keyLengthInWords = 10;
final int keyLengthInBytes = keyLengthInWords * 8;
final byte[] key = getRandomByteArray(keyLengthInWords);
@@ -204,7 +204,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(keyData, Platform.BYTE_ARRAY_OFFSET, recordLengthBytes);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
keyData,
Platform.BYTE_ARRAY_OFFSET,
recordLengthBytes,
@@ -232,7 +232,7 @@ public abstract class AbstractBytesToBytesMapSuite {
getByteArray(loc.getValueBase(), loc.getValueOffset(), recordLengthBytes));
try {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
keyData,
Platform.BYTE_ARRAY_OFFSET,
recordLengthBytes,
@@ -260,7 +260,7 @@ public abstract class AbstractBytesToBytesMapSuite {
Assert.assertFalse(loc.isDefined());
// Ensure that we store some zero-length keys
if (i % 5 == 0) {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
null,
Platform.LONG_ARRAY_OFFSET,
0,
@@ -269,7 +269,7 @@ public abstract class AbstractBytesToBytesMapSuite {
8
));
} else {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
value,
Platform.LONG_ARRAY_OFFSET,
8,
@@ -349,7 +349,7 @@ public abstract class AbstractBytesToBytesMapSuite {
KEY_LENGTH
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.LONG_ARRAY_OFFSET,
KEY_LENGTH,
@@ -417,7 +417,7 @@ public abstract class AbstractBytesToBytesMapSuite {
key.length
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.BYTE_ARRAY_OFFSET,
key.length,
@@ -471,7 +471,7 @@ public abstract class AbstractBytesToBytesMapSuite {
key.length
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.BYTE_ARRAY_OFFSET,
key.length,
@@ -514,7 +514,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(emptyArray, Platform.LONG_ARRAY_OFFSET, 0);
Assert.assertFalse(loc.isDefined());
- Assert.assertFalse(loc.putNewKey(
+ Assert.assertFalse(loc.append(
emptyArray, Platform.LONG_ARRAY_OFFSET, 0, emptyArray, Platform.LONG_ARRAY_OFFSET, 0));
} finally {
map.free();
@@ -535,7 +535,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
success =
- loc.putNewKey(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
if (!success) {
break;
}
@@ -556,7 +556,7 @@ public abstract class AbstractBytesToBytesMapSuite {
for (i = 0; i < 1024; i++) {
final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
- loc.putNewKey(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
}
BytesToBytesMap.MapIterator iter = map.iterator();
for (i = 0; i < 100; i++) {
@@ -587,6 +587,44 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
+ public void multipleValuesForSameKey() {
+ BytesToBytesMap map =
+ new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
+ try {
+ int i;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8)
+ .append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+ assert map.numKeys() == 1024;
+ assert map.numValues() == 1024;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8)
+ .append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+ assert map.numKeys() == 1024;
+ assert map.numValues() == 2048;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+ assert loc.isDefined();
+ assert loc.nextValue();
+ assert !loc.nextValue();
+ }
+ BytesToBytesMap.MapIterator iter = map.iterator();
+ for (i = 0; i < 2048; i++) {
+ assert iter.hasNext();
+ final BytesToBytesMap.Location loc = iter.next();
+ assert loc.isDefined();
+ }
+ } finally {
+ map.free();
+ }
+ }
+
+ @Test
public void initialCapacityBoundsChecking() {
try {
new BytesToBytesMap(taskMemoryManager, 0, PAGE_SIZE_BYTES);
@@ -608,7 +646,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void testPeakMemoryUsed() {
- final long recordLengthBytes = 24;
+ final long recordLengthBytes = 32;
final long pageSizeBytes = 256 + 8; // 8 bytes for end-of-page marker
final long numRecordsPerPage = (pageSizeBytes - 8) / recordLengthBytes;
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 1024, pageSizeBytes);
@@ -622,7 +660,7 @@ public abstract class AbstractBytesToBytesMapSuite {
try {
for (long i = 0; i < numRecordsPerPage * 10; i++) {
final long[] value = new long[]{i};
- map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).putNewKey(
+ map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).append(
value,
Platform.LONG_ARRAY_OFFSET,
8,