aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java105
1 files changed, 67 insertions, 38 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9aacb084f6..6807710f9f 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -56,9 +56,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ * Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
*
* This means that the first four bytes store the entire record (key + value) length. This format
- * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
+ * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
public final class BytesToBytesMap extends MemoryConsumer {
@@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Number of keys defined in the map.
*/
- private int numElements;
+ private int numKeys;
+
+ /**
+ * Number of values defined in the map. A key could have multiple values.
+ */
+ private int numValues;
/**
* The map will be expanded once the number of keys exceeds this threshold.
@@ -223,7 +229,12 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Returns the number of keys defined in the map.
*/
- public int numElements() { return numElements; }
+ public int numKeys() { return numKeys; }
+
+ /**
+ * Returns the number of values defined in the map. A key could have multiple values.
+ */
+ public int numValues() { return numValues; }
public final class MapIterator implements Iterator<Location> {
@@ -311,7 +322,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (currentPage != null) {
int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
- offsetInPage += 4 + totalLength;
+ // [total size] [key size] [key] [value] [pointer to next]
+ offsetInPage += 4 + totalLength + 8;
recordsInPage --;
return loc;
} else {
@@ -361,7 +373,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
while (numRecords > 0) {
int length = Platform.getInt(base, offset);
writer.write(base, offset + 4, length, 0);
- offset += 4 + length;
+ offset += 4 + length + 8;
numRecords--;
}
writer.close();
@@ -395,7 +407,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
- return new MapIterator(numElements, loc, false);
+ return new MapIterator(numValues, loc, false);
}
/**
@@ -409,7 +421,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
- return new MapIterator(numElements, loc, true);
+ return new MapIterator(numValues, loc, true);
}
/**
@@ -560,6 +572,20 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
+ * Find the next pair that has the same key as current one.
+ */
+ public boolean nextValue() {
+ assert isDefined;
+ long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
+ if (nextAddr == 0) {
+ return false;
+ } else {
+ updateAddressesAndSizes(nextAddr);
+ return true;
+ }
+ }
+
+ /**
* Returns the memory page that contains the current record.
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
*/
@@ -625,10 +651,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
- * Store a new key and value. This method may only be called once for a given key; if you want
- * to update the value associated with a key, then you can directly manipulate the bytes stored
- * at the value address. The return value indicates whether the put succeeded or whether it
- * failed because additional memory could not be acquired.
+ * Append a new value for the key. This method could be called multiple times for a given key.
+ * The return value indicates whether the put succeeded or whether it failed because additional
+ * memory could not be acquired.
* <p>
* It is only valid to call this method immediately after calling `lookup()` using the same key.
* </p>
@@ -637,7 +662,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* </p>
* <p>
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
- * will return information on the data stored by this `putNewKey` call.
+ * will return information on the data stored by this `append` call.
* </p>
* <p>
* As an example usage, here's the proper way to store a new key:
@@ -645,7 +670,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
* <pre>
* Location loc = map.lookup(keyBase, keyOffset, keyLength);
* if (!loc.isDefined()) {
- * if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) {
+ * if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
* // handle failure to grow map (by spilling, for example)
* }
* }
@@ -657,26 +682,23 @@ public final class BytesToBytesMap extends MemoryConsumer {
* @return true if the put() was successful and false if the put() failed because memory could
* not be acquired.
*/
- public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
- Object valueBase, long valueOffset, int valueLength) {
- assert (!isDefined) : "Can only set value once for a key";
- assert (keyLength % 8 == 0);
- assert (valueLength % 8 == 0);
- assert(longArray != null);
+ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
+ assert (klen % 8 == 0);
+ assert (vlen % 8 == 0);
+ assert (longArray != null);
-
- if (numElements == MAX_CAPACITY
+ if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
- || !canGrowArray && numElements > growthThreshold) {
+ || !canGrowArray && numKeys > growthThreshold) {
return false;
}
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
- // (8 byte key length) (key) (value)
- final long recordLength = 8 + keyLength + valueLength;
+ // (8 byte key length) (key) (value) (8 byte pointer to next value)
+ final long recordLength = 8 + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + 4L)) {
return false;
@@ -687,30 +709,36 @@ public final class BytesToBytesMap extends MemoryConsumer {
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
- Platform.putInt(base, offset, keyLength + valueLength + 4);
- Platform.putInt(base, offset + 4, keyLength);
+ Platform.putInt(base, offset, klen + vlen + 4);
+ Platform.putInt(base, offset + 4, klen);
offset += 8;
- Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength);
- offset += keyLength;
- Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength);
+ Platform.copyMemory(kbase, koff, base, offset, klen);
+ offset += klen;
+ Platform.copyMemory(vbase, voff, base, offset, vlen);
+ offset += vlen;
+ // put this value at the beginning of the list
+ Platform.putLong(base, offset, isDefined ? longArray.get(pos * 2) : 0);
// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
- numElements++;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
- longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
- isDefined = true;
+ numValues++;
+ if (!isDefined) {
+ numKeys++;
+ longArray.set(pos * 2 + 1, keyHashcode);
+ isDefined = true;
- if (numElements > growthThreshold && longArray.size() < MAX_CAPACITY) {
- try {
- growAndRehash();
- } catch (OutOfMemoryError oom) {
- canGrowArray = false;
+ if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
+ try {
+ growAndRehash();
+ } catch (OutOfMemoryError oom) {
+ canGrowArray = false;
+ }
}
}
return true;
@@ -866,7 +894,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
* Reset this map to initialized state.
*/
public void reset() {
- numElements = 0;
+ numKeys = 0;
+ numValues = 0;
longArray.zeroOut();
while (dataPages.size() > 0) {