aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-03-28 13:07:32 -0700
committerDavies Liu <davies.liu@gmail.com>2016-03-28 13:07:32 -0700
commitd7b58f1461f71ee3c028360eef0ffedd17d6a076 (patch)
tree58ddca8bb29534ecb77446e6706f33d885e01bd4 /core
parent600c0b69cab4767e8e5a6f4284777d8b9d4bd40e (diff)
downloadspark-d7b58f1461f71ee3c028360eef0ffedd17d6a076.tar.gz
spark-d7b58f1461f71ee3c028360eef0ffedd17d6a076.tar.bz2
spark-d7b58f1461f71ee3c028360eef0ffedd17d6a076.zip
[SPARK-14052] [SQL] build a BytesToBytesMap directly in HashedRelation
## What changes were proposed in this pull request? Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency. In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key. ## How was this patch tested? Existing tests. Added benchmark for broadcast hash join with duplicated keys. Author: Davies Liu <davies@databricks.com> Closes #11870 from davies/map2.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java113
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java64
2 files changed, 124 insertions, 53 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9aacb084f6..32958be7a7 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -56,9 +56,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ * Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
*
* This means that the first four bytes store the entire record (key + value) length. This format
- * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
+ * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
public final class BytesToBytesMap extends MemoryConsumer {
@@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Number of keys defined in the map.
*/
- private int numElements;
+ private int numKeys;
+
+ /**
+ * Number of values defined in the map. A key could have multiple values.
+ */
+ private int numValues;
/**
* The map will be expanded once the number of keys exceeds this threshold.
@@ -223,7 +229,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Returns the number of keys defined in the map.
*/
- public int numElements() { return numElements; }
+ public int numKeys() { return numKeys; }
+
+ /**
+ * Returns the number of values defined in the map. A key could have multiple values.
+ */
+ public int numValues() { return numValues; }
public final class MapIterator implements Iterator<Location> {
@@ -311,7 +322,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (currentPage != null) {
int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
- offsetInPage += 4 + totalLength;
+ // [total size] [key size] [key] [value] [pointer to next]
+ offsetInPage += 4 + totalLength + 8;
recordsInPage --;
return loc;
} else {
@@ -361,7 +373,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
while (numRecords > 0) {
int length = Platform.getInt(base, offset);
writer.write(base, offset + 4, length, 0);
- offset += 4 + length;
+ offset += 4 + length + 8;
numRecords--;
}
writer.close();
@@ -395,7 +407,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
- return new MapIterator(numElements, loc, false);
+ return new MapIterator(numValues, loc, false);
}
/**
@@ -409,7 +421,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
- return new MapIterator(numElements, loc, true);
+ return new MapIterator(numValues, loc, true);
}
/**
@@ -560,6 +572,20 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
+ * Find the next pair that has the same key as current one.
+ */
+ public boolean nextValue() {
+ assert isDefined;
+ long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
+ if (nextAddr == 0) {
+ return false;
+ } else {
+ updateAddressesAndSizes(nextAddr);
+ return true;
+ }
+ }
+
+ /**
* Returns the memory page that contains the current record.
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
*/
@@ -625,10 +651,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
- * Store a new key and value. This method may only be called once for a given key; if you want
- * to update the value associated with a key, then you can directly manipulate the bytes stored
- * at the value address. The return value indicates whether the put succeeded or whether it
- * failed because additional memory could not be acquired.
+ * Append a new value for the key. This method could be called multiple times for a given key.
+ * The return value indicates whether the put succeeded or whether it failed because additional
+ * memory could not be acquired.
* <p>
* It is only valid to call this method immediately after calling `lookup()` using the same key.
* </p>
@@ -637,7 +662,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* </p>
* <p>
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
- * will return information on the data stored by this `putNewKey` call.
+ * will return information on the data stored by this `append` call.
* </p>
* <p>
* As an example usage, here's the proper way to store a new key:
@@ -645,7 +670,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* <pre>
* Location loc = map.lookup(keyBase, keyOffset, keyLength);
* if (!loc.isDefined()) {
- * if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) {
+ * if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
* // handle failure to grow map (by spilling, for example)
* }
* }
@@ -657,26 +682,23 @@ public final class BytesToBytesMap extends MemoryConsumer {
* @return true if the put() was successful and false if the put() failed because memory could
* not be acquired.
*/
- public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
- Object valueBase, long valueOffset, int valueLength) {
- assert (!isDefined) : "Can only set value once for a key";
- assert (keyLength % 8 == 0);
- assert (valueLength % 8 == 0);
- assert(longArray != null);
-
+ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
+ assert (klen % 8 == 0);
+ assert (vlen % 8 == 0);
+ assert (longArray != null);
- if (numElements == MAX_CAPACITY
+ if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
- || !canGrowArray && numElements > growthThreshold) {
+ || !canGrowArray && numKeys > growthThreshold) {
return false;
}
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
- // (8 byte key length) (key) (value)
- final long recordLength = 8 + keyLength + valueLength;
+ // (8 byte key length) (key) (value) (8 byte pointer to next value)
+ final long recordLength = 8 + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + 4L)) {
return false;
@@ -687,30 +709,40 @@ public final class BytesToBytesMap extends MemoryConsumer {
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
- Platform.putInt(base, offset, keyLength + valueLength + 4);
- Platform.putInt(base, offset + 4, keyLength);
+ Platform.putInt(base, offset, klen + vlen + 4);
+ Platform.putInt(base, offset + 4, klen);
offset += 8;
- Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
- offset += keyLength;
- Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength);
+ Platform.copyMemory(kbase, koff, base, offset, klen);
+ offset += klen;
+ Platform.copyMemory(vbase, voff, base, offset, vlen);
+ offset += vlen;
+ Platform.putLong(base, offset, 0);
// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
- numElements++;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
- longArray.set(pos * 2, storedKeyAddress);
- longArray.set(pos * 2 + 1, keyHashcode);
- updateAddressesAndSizes(storedKeyAddress);
- isDefined = true;
+ numValues++;
+ if (isDefined) {
+ // put this pair at the end of chain
+ while (nextValue()) { /* do nothing */ }
+ Platform.putLong(baseObject, valueOffset + valueLength, storedKeyAddress);
+ nextValue(); // point to new added value
+ } else {
+ numKeys++;
+ longArray.set(pos * 2, storedKeyAddress);
+ longArray.set(pos * 2 + 1, keyHashcode);
+ updateAddressesAndSizes(storedKeyAddress);
+ isDefined = true;
- if (numElements > growthThreshold && longArray.size() < MAX_CAPACITY) {
- try {
- growAndRehash();
- } catch (OutOfMemoryError oom) {
- canGrowArray = false;
+ if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
+ try {
+ growAndRehash();
+ } catch (OutOfMemoryError oom) {
+ canGrowArray = false;
+ }
}
}
return true;
@@ -866,7 +898,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
* Reset this map to initialized state.
*/
public void reset() {
- numElements = 0;
+ numKeys = 0;
+ numValues = 0;
longArray.zeroOut();
while (dataPages.size() > 0) {
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 449fb45c30..84b82f5a47 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -182,7 +182,7 @@ public abstract class AbstractBytesToBytesMapSuite {
public void emptyMap() {
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 64, PAGE_SIZE_BYTES);
try {
- Assert.assertEquals(0, map.numElements());
+ Assert.assertEquals(0, map.numKeys());
final int keyLengthInWords = 10;
final int keyLengthInBytes = keyLengthInWords * 8;
final byte[] key = getRandomByteArray(keyLengthInWords);
@@ -204,7 +204,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(keyData, Platform.BYTE_ARRAY_OFFSET, recordLengthBytes);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
keyData,
Platform.BYTE_ARRAY_OFFSET,
recordLengthBytes,
@@ -232,7 +232,7 @@ public abstract class AbstractBytesToBytesMapSuite {
getByteArray(loc.getValueBase(), loc.getValueOffset(), recordLengthBytes));
try {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
keyData,
Platform.BYTE_ARRAY_OFFSET,
recordLengthBytes,
@@ -260,7 +260,7 @@ public abstract class AbstractBytesToBytesMapSuite {
Assert.assertFalse(loc.isDefined());
// Ensure that we store some zero-length keys
if (i % 5 == 0) {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
null,
Platform.LONG_ARRAY_OFFSET,
0,
@@ -269,7 +269,7 @@ public abstract class AbstractBytesToBytesMapSuite {
8
));
} else {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
value,
Platform.LONG_ARRAY_OFFSET,
8,
@@ -349,7 +349,7 @@ public abstract class AbstractBytesToBytesMapSuite {
KEY_LENGTH
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.LONG_ARRAY_OFFSET,
KEY_LENGTH,
@@ -417,7 +417,7 @@ public abstract class AbstractBytesToBytesMapSuite {
key.length
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.BYTE_ARRAY_OFFSET,
key.length,
@@ -471,7 +471,7 @@ public abstract class AbstractBytesToBytesMapSuite {
key.length
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.BYTE_ARRAY_OFFSET,
key.length,
@@ -514,7 +514,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(emptyArray, Platform.LONG_ARRAY_OFFSET, 0);
Assert.assertFalse(loc.isDefined());
- Assert.assertFalse(loc.putNewKey(
+ Assert.assertFalse(loc.append(
emptyArray, Platform.LONG_ARRAY_OFFSET, 0, emptyArray, Platform.LONG_ARRAY_OFFSET, 0));
} finally {
map.free();
@@ -535,7 +535,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
success =
- loc.putNewKey(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
if (!success) {
break;
}
@@ -556,7 +556,7 @@ public abstract class AbstractBytesToBytesMapSuite {
for (i = 0; i < 1024; i++) {
final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
- loc.putNewKey(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
}
BytesToBytesMap.MapIterator iter = map.iterator();
for (i = 0; i < 100; i++) {
@@ -587,6 +587,44 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
+ public void multipleValuesForSameKey() {
+ BytesToBytesMap map =
+ new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
+ try {
+ int i;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8)
+ .append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+ assert map.numKeys() == 1024;
+ assert map.numValues() == 1024;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8)
+ .append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+ assert map.numKeys() == 1024;
+ assert map.numValues() == 2048;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+ assert loc.isDefined();
+ assert loc.nextValue();
+ assert !loc.nextValue();
+ }
+ BytesToBytesMap.MapIterator iter = map.iterator();
+ for (i = 0; i < 2048; i++) {
+ assert iter.hasNext();
+ final BytesToBytesMap.Location loc = iter.next();
+ assert loc.isDefined();
+ }
+ } finally {
+ map.free();
+ }
+ }
+
+ @Test
public void initialCapacityBoundsChecking() {
try {
new BytesToBytesMap(taskMemoryManager, 0, PAGE_SIZE_BYTES);
@@ -608,7 +646,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void testPeakMemoryUsed() {
- final long recordLengthBytes = 24;
+ final long recordLengthBytes = 32;
final long pageSizeBytes = 256 + 8; // 8 bytes for end-of-page marker
final long numRecordsPerPage = (pageSizeBytes - 8) / recordLengthBytes;
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 1024, pageSizeBytes);
@@ -622,7 +660,7 @@ public abstract class AbstractBytesToBytesMapSuite {
try {
for (long i = 0; i < numRecordsPerPage * 10; i++) {
final long[] value = new long[]{i};
- map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).putNewKey(
+ map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).append(
value,
Platform.LONG_ARRAY_OFFSET,
8,