aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java113
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java64
2 files changed, 124 insertions, 53 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9aacb084f6..32958be7a7 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -56,9 +56,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ * Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
*
* This means that the first four bytes store the entire record (key + value) length. This format
- * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
+ * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
public final class BytesToBytesMap extends MemoryConsumer {
@@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Number of keys defined in the map.
*/
- private int numElements;
+ private int numKeys;
+
+ /**
+ * Number of values defined in the map. A key could have multiple values.
+ */
+ private int numValues;
/**
* The map will be expanded once the number of keys exceeds this threshold.
@@ -223,7 +229,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Returns the number of keys defined in the map.
*/
- public int numElements() { return numElements; }
+ public int numKeys() { return numKeys; }
+
+ /**
+ * Returns the number of values defined in the map. A key could have multiple values.
+ */
+ public int numValues() { return numValues; }
public final class MapIterator implements Iterator<Location> {
@@ -311,7 +322,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (currentPage != null) {
int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
- offsetInPage += 4 + totalLength;
+ // [total size] [key size] [key] [value] [pointer to next]
+ offsetInPage += 4 + totalLength + 8;
recordsInPage --;
return loc;
} else {
@@ -361,7 +373,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
while (numRecords > 0) {
int length = Platform.getInt(base, offset);
writer.write(base, offset + 4, length, 0);
- offset += 4 + length;
+ offset += 4 + length + 8;
numRecords--;
}
writer.close();
@@ -395,7 +407,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
- return new MapIterator(numElements, loc, false);
+ return new MapIterator(numValues, loc, false);
}
/**
@@ -409,7 +421,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
- return new MapIterator(numElements, loc, true);
+ return new MapIterator(numValues, loc, true);
}
/**
@@ -560,6 +572,20 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
+ * Find the next pair that has the same key as current one.
+ */
+ public boolean nextValue() {
+ assert isDefined;
+ long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
+ if (nextAddr == 0) {
+ return false;
+ } else {
+ updateAddressesAndSizes(nextAddr);
+ return true;
+ }
+ }
+
+ /**
* Returns the memory page that contains the current record.
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
*/
@@ -625,10 +651,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
- * Store a new key and value. This method may only be called once for a given key; if you want
- * to update the value associated with a key, then you can directly manipulate the bytes stored
- * at the value address. The return value indicates whether the put succeeded or whether it
- * failed because additional memory could not be acquired.
+ * Append a new value for the key. This method could be called multiple times for a given key.
+ * The return value indicates whether the put succeeded or whether it failed because additional
+ * memory could not be acquired.
* <p>
* It is only valid to call this method immediately after calling `lookup()` using the same key.
* </p>
@@ -637,7 +662,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* </p>
* <p>
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
- * will return information on the data stored by this `putNewKey` call.
+ * will return information on the data stored by this `append` call.
* </p>
* <p>
* As an example usage, here's the proper way to store a new key:
@@ -645,7 +670,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* <pre>
* Location loc = map.lookup(keyBase, keyOffset, keyLength);
* if (!loc.isDefined()) {
- * if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) {
+ * if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
* // handle failure to grow map (by spilling, for example)
* }
* }
@@ -657,26 +682,23 @@ public final class BytesToBytesMap extends MemoryConsumer {
* @return true if the put() was successful and false if the put() failed because memory could
* not be acquired.
*/
- public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
- Object valueBase, long valueOffset, int valueLength) {
- assert (!isDefined) : "Can only set value once for a key";
- assert (keyLength % 8 == 0);
- assert (valueLength % 8 == 0);
- assert(longArray != null);
-
+ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
+ assert (klen % 8 == 0);
+ assert (vlen % 8 == 0);
+ assert (longArray != null);
- if (numElements == MAX_CAPACITY
+ if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
- || !canGrowArray && numElements > growthThreshold) {
+ || !canGrowArray && numKeys > growthThreshold) {
return false;
}
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
- // (8 byte key length) (key) (value)
- final long recordLength = 8 + keyLength + valueLength;
+ // (8 byte key length) (key) (value) (8 byte pointer to next value)
+ final long recordLength = 8 + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + 4L)) {
return false;
@@ -687,30 +709,40 @@ public final class BytesToBytesMap extends MemoryConsumer {
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
- Platform.putInt(base, offset, keyLength + valueLength + 4);
- Platform.putInt(base, offset + 4, keyLength);
+ Platform.putInt(base, offset, klen + vlen + 4);
+ Platform.putInt(base, offset + 4, klen);
offset += 8;
- Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
- offset += keyLength;
- Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength);
+ Platform.copyMemory(kbase, koff, base, offset, klen);
+ offset += klen;
+ Platform.copyMemory(vbase, voff, base, offset, vlen);
+ offset += vlen;
+ Platform.putLong(base, offset, 0);
// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
- numElements++;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
- longArray.set(pos * 2, storedKeyAddress);
- longArray.set(pos * 2 + 1, keyHashcode);
- updateAddressesAndSizes(storedKeyAddress);
- isDefined = true;
+ numValues++;
+ if (isDefined) {
+ // put this pair at the end of chain
+ while (nextValue()) { /* do nothing */ }
+ Platform.putLong(baseObject, valueOffset + valueLength, storedKeyAddress);
+ nextValue(); // point to new added value
+ } else {
+ numKeys++;
+ longArray.set(pos * 2, storedKeyAddress);
+ longArray.set(pos * 2 + 1, keyHashcode);
+ updateAddressesAndSizes(storedKeyAddress);
+ isDefined = true;
- if (numElements > growthThreshold && longArray.size() < MAX_CAPACITY) {
- try {
- growAndRehash();
- } catch (OutOfMemoryError oom) {
- canGrowArray = false;
+ if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
+ try {
+ growAndRehash();
+ } catch (OutOfMemoryError oom) {
+ canGrowArray = false;
+ }
}
}
return true;
@@ -866,7 +898,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
* Reset this map to initialized state.
*/
public void reset() {
- numElements = 0;
+ numKeys = 0;
+ numValues = 0;
longArray.zeroOut();
while (dataPages.size() > 0) {
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 449fb45c30..84b82f5a47 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -182,7 +182,7 @@ public abstract class AbstractBytesToBytesMapSuite {
public void emptyMap() {
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 64, PAGE_SIZE_BYTES);
try {
- Assert.assertEquals(0, map.numElements());
+ Assert.assertEquals(0, map.numKeys());
final int keyLengthInWords = 10;
final int keyLengthInBytes = keyLengthInWords * 8;
final byte[] key = getRandomByteArray(keyLengthInWords);
@@ -204,7 +204,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(keyData, Platform.BYTE_ARRAY_OFFSET, recordLengthBytes);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
keyData,
Platform.BYTE_ARRAY_OFFSET,
recordLengthBytes,
@@ -232,7 +232,7 @@ public abstract class AbstractBytesToBytesMapSuite {
getByteArray(loc.getValueBase(), loc.getValueOffset(), recordLengthBytes));
try {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
keyData,
Platform.BYTE_ARRAY_OFFSET,
recordLengthBytes,
@@ -260,7 +260,7 @@ public abstract class AbstractBytesToBytesMapSuite {
Assert.assertFalse(loc.isDefined());
// Ensure that we store some zero-length keys
if (i % 5 == 0) {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
null,
Platform.LONG_ARRAY_OFFSET,
0,
@@ -269,7 +269,7 @@ public abstract class AbstractBytesToBytesMapSuite {
8
));
} else {
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
value,
Platform.LONG_ARRAY_OFFSET,
8,
@@ -349,7 +349,7 @@ public abstract class AbstractBytesToBytesMapSuite {
KEY_LENGTH
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.LONG_ARRAY_OFFSET,
KEY_LENGTH,
@@ -417,7 +417,7 @@ public abstract class AbstractBytesToBytesMapSuite {
key.length
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.BYTE_ARRAY_OFFSET,
key.length,
@@ -471,7 +471,7 @@ public abstract class AbstractBytesToBytesMapSuite {
key.length
);
Assert.assertFalse(loc.isDefined());
- Assert.assertTrue(loc.putNewKey(
+ Assert.assertTrue(loc.append(
key,
Platform.BYTE_ARRAY_OFFSET,
key.length,
@@ -514,7 +514,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final BytesToBytesMap.Location loc =
map.lookup(emptyArray, Platform.LONG_ARRAY_OFFSET, 0);
Assert.assertFalse(loc.isDefined());
- Assert.assertFalse(loc.putNewKey(
+ Assert.assertFalse(loc.append(
emptyArray, Platform.LONG_ARRAY_OFFSET, 0, emptyArray, Platform.LONG_ARRAY_OFFSET, 0));
} finally {
map.free();
@@ -535,7 +535,7 @@ public abstract class AbstractBytesToBytesMapSuite {
final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
success =
- loc.putNewKey(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
if (!success) {
break;
}
@@ -556,7 +556,7 @@ public abstract class AbstractBytesToBytesMapSuite {
for (i = 0; i < 1024; i++) {
final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
- loc.putNewKey(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
}
BytesToBytesMap.MapIterator iter = map.iterator();
for (i = 0; i < 100; i++) {
@@ -587,6 +587,44 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
+ public void multipleValuesForSameKey() {
+ BytesToBytesMap map =
+ new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
+ try {
+ int i;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8)
+ .append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+ assert map.numKeys() == 1024;
+ assert map.numValues() == 1024;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8)
+ .append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+ assert map.numKeys() == 1024;
+ assert map.numValues() == 2048;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+ assert loc.isDefined();
+ assert loc.nextValue();
+ assert !loc.nextValue();
+ }
+ BytesToBytesMap.MapIterator iter = map.iterator();
+ for (i = 0; i < 2048; i++) {
+ assert iter.hasNext();
+ final BytesToBytesMap.Location loc = iter.next();
+ assert loc.isDefined();
+ }
+ } finally {
+ map.free();
+ }
+ }
+
+ @Test
public void initialCapacityBoundsChecking() {
try {
new BytesToBytesMap(taskMemoryManager, 0, PAGE_SIZE_BYTES);
@@ -608,7 +646,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void testPeakMemoryUsed() {
- final long recordLengthBytes = 24;
+ final long recordLengthBytes = 32;
final long pageSizeBytes = 256 + 8; // 8 bytes for end-of-page marker
final long numRecordsPerPage = (pageSizeBytes - 8) / recordLengthBytes;
final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, 1024, pageSizeBytes);
@@ -622,7 +660,7 @@ public abstract class AbstractBytesToBytesMapSuite {
try {
for (long i = 0; i < numRecordsPerPage * 10; i++) {
final long[] value = new long[]{i};
- map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).putNewKey(
+ map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).append(
value,
Platform.LONG_ARRAY_OFFSET,
8,