diff options
author | Reynold Xin <rxin@databricks.com> | 2015-07-31 23:55:16 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-31 23:55:16 -0700 |
commit | d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302 (patch) | |
tree | 94dff8456047924b32f7295dca1e7f47702d5e16 /core | |
parent | 67ad4e21fc68336b0ad6f9a363fb5ebb51f592bf (diff) | |
download | spark-d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302.tar.gz spark-d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302.tar.bz2 spark-d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302.zip |
[SPARK-9517][SQL] BytesToBytesMap should encode data the same way as UnsafeExternalSorter
BytesToBytesMap current encodes key/value data in the following format:
```
8B key length, key data, 8B value length, value data
```
UnsafeExternalSorter, on the other hand, encodes data this way:
```
4B record length, data
```
As a result, we cannot pass records encoded by BytesToBytesMap directly into UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we can then pass the key/value records directly into UnsafeExternalSorter:
```
4B key+value length, 4B key length, key data, value data
```
Author: Reynold Xin <rxin@databricks.com>
Closes #7845 from rxin/kvsort-rebase and squashes the following commits:
5716b59 [Reynold Xin] Fixed test.
2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key first.
a51b641 [Reynold Xin] Added a KV sorter interface.
Diffstat (limited to 'core')
3 files changed, 49 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0f42950e6e..481375f493 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -17,7 +17,6 @@ package org.apache.spark.unsafe.map; -import java.io.IOException; import java.lang.Override; import java.lang.UnsupportedOperationException; import java.util.Iterator; @@ -212,7 +211,7 @@ public final class BytesToBytesMap { */ public int numElements() { return numElements; } - private static final class BytesToBytesMapIterator implements Iterator<Location> { + public static final class BytesToBytesMapIterator implements Iterator<Location> { private final int numRecords; private final Iterator<MemoryBlock> dataPagesIterator; @@ -222,7 +221,8 @@ public final class BytesToBytesMap { private Object pageBaseObject; private long offsetInPage; - BytesToBytesMapIterator(int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) { + private BytesToBytesMapIterator( + int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) { this.numRecords = numRecords; this.dataPagesIterator = dataPagesIterator; this.loc = loc; @@ -244,13 +244,13 @@ public final class BytesToBytesMap { @Override public Location next() { - int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage); - if (keyLength == END_OF_PAGE_MARKER) { + int totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage); + if (totalLength == END_OF_PAGE_MARKER) { advanceToNextPage(); - keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage); + totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage); } loc.with(pageBaseObject, offsetInPage); - offsetInPage += 8 + 8 + keyLength + loc.getValueLength(); + offsetInPage += 8 + totalLength; currentRecordNumber++; return loc; } @@ -269,7 +269,7 @@ public final class BytesToBytesMap { * If any other lookups or operations are performed on this map while iterating over it, including * `lookup()`, the behavior of the returned iterator is undefined. */ - public Iterator<Location> iterator() { + public BytesToBytesMapIterator iterator() { return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc); } @@ -352,15 +352,18 @@ public final class BytesToBytesMap { taskMemoryManager.getOffsetInPage(fullKeyAddress)); } - private void updateAddressesAndSizes(Object page, long keyOffsetInPage) { - long position = keyOffsetInPage; - keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position); - position += 8; // word used to store the key size - keyMemoryLocation.setObjAndOffset(page, position); - position += keyLength; - valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position); - position += 8; // word used to store the key size - valueMemoryLocation.setObjAndOffset(page, position); + private void updateAddressesAndSizes(final Object page, final long keyOffsetInPage) { + long position = keyOffsetInPage; + final int totalLength = PlatformDependent.UNSAFE.getInt(page, position); + position += 4; + keyLength = PlatformDependent.UNSAFE.getInt(page, position); + position += 4; + valueLength = totalLength - keyLength; + + keyMemoryLocation.setObjAndOffset(page, position); + + position += keyLength; + valueMemoryLocation.setObjAndOffset(page, position); } Location with(int pos, int keyHashcode, boolean isDefined) { @@ -478,7 +481,7 @@ public final class BytesToBytesMap { // the key address instead of storing the absolute address of the value, the key and value // must be stored in the same memory page. // (8 byte key length) (key) (8 byte value length) (value) - final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes; + final long requiredSize = 8 + keyLengthBytes + valueLengthBytes; // --- Figure out where to insert the new record --------------------------------------------- @@ -508,7 +511,7 @@ public final class BytesToBytesMap { // There wasn't enough space in the current page, so write an end-of-page marker: final Object pageBaseObject = currentDataPage.getBaseObject(); final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor; - PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER); + PlatformDependent.UNSAFE.putInt(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER); } final long memoryGranted = shuffleMemoryManager.tryToAcquire(pageSizeBytes); if (memoryGranted != pageSizeBytes) { @@ -535,21 +538,22 @@ public final class BytesToBytesMap { long insertCursor = dataPageInsertOffset; // Compute all of our offsets up-front: - final long keySizeOffsetInPage = insertCursor; - insertCursor += 8; // word used to store the key size + final long totalLengthOffset = insertCursor; + insertCursor += 4; + final long keyLengthOffset = insertCursor; + insertCursor += 4; final long keyDataOffsetInPage = insertCursor; insertCursor += keyLengthBytes; - final long valueSizeOffsetInPage = insertCursor; - insertCursor += 8; // word used to store the value size final long valueDataOffsetInPage = insertCursor; insertCursor += valueLengthBytes; // word used to store the value size + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, totalLengthOffset, + keyLengthBytes + valueLengthBytes); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes); // Copy the key - PlatformDependent.UNSAFE.putLong(dataPageBaseObject, keySizeOffsetInPage, keyLengthBytes); PlatformDependent.copyMemory( keyBaseObject, keyBaseOffset, dataPageBaseObject, keyDataOffsetInPage, keyLengthBytes); // Copy the value - PlatformDependent.UNSAFE.putLong(dataPageBaseObject, valueSizeOffsetInPage, valueLengthBytes); PlatformDependent.copyMemory(valueBaseObject, valueBaseOffset, dataPageBaseObject, valueDataOffsetInPage, valueLengthBytes); @@ -557,7 +561,7 @@ public final class BytesToBytesMap { if (useOverflowPage) { // Store the end-of-page marker at the end of the data page - PlatformDependent.UNSAFE.putLong(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER); + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER); } else { pageCursor += requiredSize; } @@ -565,7 +569,7 @@ public final class BytesToBytesMap { numElements++; bitset.set(pos); final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset( - dataPage, keySizeOffsetInPage); + dataPage, totalLengthOffset); longArray.set(pos * 2, storedKeyAddress); longArray.set(pos * 2 + 1, keyHashcode); updateAddressesAndSizes(storedKeyAddress); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 866e0b4151..c05f2c332e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -282,6 +282,21 @@ public final class UnsafeExternalSorter { sorter.insertRecord(recordAddress, prefix); } + /** + * Write a record to the sorter. The record is broken down into two different parts, and + * + */ + public void insertRecord( + Object recordBaseObject1, + long recordBaseOffset1, + int lengthInBytes1, + Object recordBaseObject2, + long recordBaseOffset2, + int lengthInBytes2, + long prefix) throws IOException { + + } + public UnsafeSorterIterator getSortedIterator() throws IOException { final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator(); int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0); diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 60f483acbc..70f8ca4d21 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -243,17 +243,17 @@ public abstract class AbstractBytesToBytesMapSuite { @Test public void iteratingOverDataPagesWithWastedSpace() throws Exception { final int NUM_ENTRIES = 1000 * 1000; - final int KEY_LENGTH = 16; + final int KEY_LENGTH = 24; final int VALUE_LENGTH = 40; final BytesToBytesMap map = new BytesToBytesMap( taskMemoryManager, shuffleMemoryManager, NUM_ENTRIES, PAGE_SIZE_BYTES); - // Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte + // Each record will take 8 + 24 + 40 = 72 bytes of space in the data page. Our 64-megabyte // pages won't be evenly-divisible by records of this size, which will cause us to waste some // space at the end of the page. This is necessary in order for us to take the end-of-record // handling branch in iterator(). try { for (int i = 0; i < NUM_ENTRIES; i++) { - final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes + final long[] key = new long[] { i, i, i }; // 3 * 8 = 24 bytes final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes final BytesToBytesMap.Location loc = map.lookup( key, |