diff options
Diffstat (limited to 'core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java')
-rw-r--r-- | core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 105 |
1 files changed, 67 insertions, 38 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 9aacb084f6..6807710f9f 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -56,9 +56,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter; * Bytes 4 to 8: len(k) * Bytes 8 to 8 + len(k): key data * Bytes 8 + len(k) to 8 + len(k) + len(v): value data + * Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair * * This means that the first four bytes store the entire record (key + value) length. This format - * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter}, + * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter}, * so we can pass records from this map directly into the sorter to sort records in place. */ public final class BytesToBytesMap extends MemoryConsumer { @@ -132,7 +133,12 @@ public final class BytesToBytesMap extends MemoryConsumer { /** * Number of keys defined in the map. */ - private int numElements; + private int numKeys; + + /** + * Number of values defined in the map. A key could have multiple values. + */ + private int numValues; /** * The map will be expanded once the number of keys exceeds this threshold. @@ -223,7 +229,12 @@ public final class BytesToBytesMap extends MemoryConsumer { /** * Returns the number of keys defined in the map. */ - public int numElements() { return numElements; } + public int numKeys() { return numKeys; } + + /** + * Returns the number of values defined in the map. A key could have multiple values. + */ + public int numValues() { return numValues; } public final class MapIterator implements Iterator<Location> { @@ -311,7 +322,8 @@ public final class BytesToBytesMap extends MemoryConsumer { if (currentPage != null) { int totalLength = Platform.getInt(pageBaseObject, offsetInPage); loc.with(currentPage, offsetInPage); - offsetInPage += 4 + totalLength; + // [total size] [key size] [key] [value] [pointer to next] + offsetInPage += 4 + totalLength + 8; recordsInPage --; return loc; } else { @@ -361,7 +373,7 @@ public final class BytesToBytesMap extends MemoryConsumer { while (numRecords > 0) { int length = Platform.getInt(base, offset); writer.write(base, offset + 4, length, 0); - offset += 4 + length; + offset += 4 + length + 8; numRecords--; } writer.close(); @@ -395,7 +407,7 @@ public final class BytesToBytesMap extends MemoryConsumer { * `lookup()`, the behavior of the returned iterator is undefined. */ public MapIterator iterator() { - return new MapIterator(numElements, loc, false); + return new MapIterator(numValues, loc, false); } /** @@ -409,7 +421,7 @@ public final class BytesToBytesMap extends MemoryConsumer { * `lookup()`, the behavior of the returned iterator is undefined. */ public MapIterator destructiveIterator() { - return new MapIterator(numElements, loc, true); + return new MapIterator(numValues, loc, true); } /** @@ -560,6 +572,20 @@ public final class BytesToBytesMap extends MemoryConsumer { } /** + * Find the next pair that has the same key as current one. + */ + public boolean nextValue() { + assert isDefined; + long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength); + if (nextAddr == 0) { + return false; + } else { + updateAddressesAndSizes(nextAddr); + return true; + } + } + + /** * Returns the memory page that contains the current record. * This is only valid if this is returned by {@link BytesToBytesMap#iterator()}. */ @@ -625,10 +651,9 @@ public final class BytesToBytesMap extends MemoryConsumer { } /** - * Store a new key and value. This method may only be called once for a given key; if you want - * to update the value associated with a key, then you can directly manipulate the bytes stored - * at the value address. The return value indicates whether the put succeeded or whether it - * failed because additional memory could not be acquired. + * Append a new value for the key. This method could be called multiple times for a given key. + * The return value indicates whether the put succeeded or whether it failed because additional + * memory could not be acquired. * <p> * It is only valid to call this method immediately after calling `lookup()` using the same key. * </p> @@ -637,7 +662,7 @@ public final class BytesToBytesMap extends MemoryConsumer { * </p> * <p> * After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length` - * will return information on the data stored by this `putNewKey` call. + * will return information on the data stored by this `append` call. * </p> * <p> * As an example usage, here's the proper way to store a new key: @@ -645,7 +670,7 @@ public final class BytesToBytesMap extends MemoryConsumer { * <pre> * Location loc = map.lookup(keyBase, keyOffset, keyLength); * if (!loc.isDefined()) { - * if (!loc.putNewKey(keyBase, keyOffset, keyLength, ...)) { + * if (!loc.append(keyBase, keyOffset, keyLength, ...)) { * // handle failure to grow map (by spilling, for example) * } * } @@ -657,26 +682,23 @@ public final class BytesToBytesMap extends MemoryConsumer { * @return true if the put() was successful and false if the put() failed because memory could * not be acquired. */ - public boolean putNewKey(Object keyBase, long keyOffset, int keyLength, - Object valueBase, long valueOffset, int valueLength) { - assert (!isDefined) : "Can only set value once for a key"; - assert (keyLength % 8 == 0); - assert (valueLength % 8 == 0); - assert(longArray != null); + public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) { + assert (klen % 8 == 0); + assert (vlen % 8 == 0); + assert (longArray != null); - - if (numElements == MAX_CAPACITY + if (numKeys == MAX_CAPACITY // The map could be reused from last spill (because of no enough memory to grow), // then we don't try to grow again if hit the `growthThreshold`. - || !canGrowArray && numElements > growthThreshold) { + || !canGrowArray && numKeys > growthThreshold) { return false; } // Here, we'll copy the data into our data pages. Because we only store a relative offset from // the key address instead of storing the absolute address of the value, the key and value // must be stored in the same memory page. - // (8 byte key length) (key) (value) - final long recordLength = 8 + keyLength + valueLength; + // (8 byte key length) (key) (value) (8 byte pointer to next value) + final long recordLength = 8 + klen + vlen + 8; if (currentPage == null || currentPage.size() - pageCursor < recordLength) { if (!acquireNewPage(recordLength + 4L)) { return false; @@ -687,30 +709,36 @@ public final class BytesToBytesMap extends MemoryConsumer { final Object base = currentPage.getBaseObject(); long offset = currentPage.getBaseOffset() + pageCursor; final long recordOffset = offset; - Platform.putInt(base, offset, keyLength + valueLength + 4); - Platform.putInt(base, offset + 4, keyLength); + Platform.putInt(base, offset, klen + vlen + 4); + Platform.putInt(base, offset + 4, klen); offset += 8; - Platform.copyMemory(keyBase, keyOffset, base, offset, keyLength); - offset += keyLength; - Platform.copyMemory(valueBase, valueOffset, base, offset, valueLength); + Platform.copyMemory(kbase, koff, base, offset, klen); + offset += klen; + Platform.copyMemory(vbase, voff, base, offset, vlen); + offset += vlen; + // put this value at the beginning of the list + Platform.putLong(base, offset, isDefined ? longArray.get(pos * 2) : 0); // --- Update bookkeeping data structures ---------------------------------------------------- offset = currentPage.getBaseOffset(); Platform.putInt(base, offset, Platform.getInt(base, offset) + 1); pageCursor += recordLength; - numElements++; final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset( currentPage, recordOffset); longArray.set(pos * 2, storedKeyAddress); - longArray.set(pos * 2 + 1, keyHashcode); updateAddressesAndSizes(storedKeyAddress); - isDefined = true; + numValues++; + if (!isDefined) { + numKeys++; + longArray.set(pos * 2 + 1, keyHashcode); + isDefined = true; - if (numElements > growthThreshold && longArray.size() < MAX_CAPACITY) { - try { - growAndRehash(); - } catch (OutOfMemoryError oom) { - canGrowArray = false; + if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { + try { + growAndRehash(); + } catch (OutOfMemoryError oom) { + canGrowArray = false; + } } } return true; @@ -866,7 +894,8 @@ public final class BytesToBytesMap extends MemoryConsumer { * Reset this map to initialized state. */ public void reset() { - numElements = 0; + numKeys = 0; + numValues = 0; longArray.zeroOut(); while (dataPages.size() > 0) { |