aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-31 23:55:16 -0700
committerReynold Xin <rxin@databricks.com>2015-07-31 23:55:16 -0700
commitd90f2cf7a2a1d1e69f9ab385f35f62d4091b5302 (patch)
tree94dff8456047924b32f7295dca1e7f47702d5e16 /core
parent67ad4e21fc68336b0ad6f9a363fb5ebb51f592bf (diff)
downloadspark-d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302.tar.gz
spark-d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302.tar.bz2
spark-d90f2cf7a2a1d1e69f9ab385f35f62d4091b5302.zip
[SPARK-9517][SQL] BytesToBytesMap should encode data the same way as UnsafeExternalSorter
BytesToBytesMap current encodes key/value data in the following format: ``` 8B key length, key data, 8B value length, value data ``` UnsafeExternalSorter, on the other hand, encodes data this way: ``` 4B record length, data ``` As a result, we cannot pass records encoded by BytesToBytesMap directly into UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we can then pass the key/value records directly into UnsafeExternalSorter: ``` 4B key+value length, 4B key length, key data, value data ``` Author: Reynold Xin <rxin@databricks.com> Closes #7845 from rxin/kvsort-rebase and squashes the following commits: 5716b59 [Reynold Xin] Fixed test. 2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key first. a51b641 [Reynold Xin] Added a KV sorter interface.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java58
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java15
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java6
3 files changed, 49 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 0f42950e6e..481375f493 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -17,7 +17,6 @@
package org.apache.spark.unsafe.map;
-import java.io.IOException;
import java.lang.Override;
import java.lang.UnsupportedOperationException;
import java.util.Iterator;
@@ -212,7 +211,7 @@ public final class BytesToBytesMap {
*/
public int numElements() { return numElements; }
- private static final class BytesToBytesMapIterator implements Iterator<Location> {
+ public static final class BytesToBytesMapIterator implements Iterator<Location> {
private final int numRecords;
private final Iterator<MemoryBlock> dataPagesIterator;
@@ -222,7 +221,8 @@ public final class BytesToBytesMap {
private Object pageBaseObject;
private long offsetInPage;
- BytesToBytesMapIterator(int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
+ private BytesToBytesMapIterator(
+ int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
this.numRecords = numRecords;
this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
@@ -244,13 +244,13 @@ public final class BytesToBytesMap {
@Override
public Location next() {
- int keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
- if (keyLength == END_OF_PAGE_MARKER) {
+ int totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
+ if (totalLength == END_OF_PAGE_MARKER) {
advanceToNextPage();
- keyLength = (int) PlatformDependent.UNSAFE.getLong(pageBaseObject, offsetInPage);
+ totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
}
loc.with(pageBaseObject, offsetInPage);
- offsetInPage += 8 + 8 + keyLength + loc.getValueLength();
+ offsetInPage += 8 + totalLength;
currentRecordNumber++;
return loc;
}
@@ -269,7 +269,7 @@ public final class BytesToBytesMap {
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
*/
- public Iterator<Location> iterator() {
+ public BytesToBytesMapIterator iterator() {
return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
}
@@ -352,15 +352,18 @@ public final class BytesToBytesMap {
taskMemoryManager.getOffsetInPage(fullKeyAddress));
}
- private void updateAddressesAndSizes(Object page, long keyOffsetInPage) {
- long position = keyOffsetInPage;
- keyLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
- position += 8; // word used to store the key size
- keyMemoryLocation.setObjAndOffset(page, position);
- position += keyLength;
- valueLength = (int) PlatformDependent.UNSAFE.getLong(page, position);
- position += 8; // word used to store the key size
- valueMemoryLocation.setObjAndOffset(page, position);
+ private void updateAddressesAndSizes(final Object page, final long keyOffsetInPage) {
+ long position = keyOffsetInPage;
+ final int totalLength = PlatformDependent.UNSAFE.getInt(page, position);
+ position += 4;
+ keyLength = PlatformDependent.UNSAFE.getInt(page, position);
+ position += 4;
+ valueLength = totalLength - keyLength;
+
+ keyMemoryLocation.setObjAndOffset(page, position);
+
+ position += keyLength;
+ valueMemoryLocation.setObjAndOffset(page, position);
}
Location with(int pos, int keyHashcode, boolean isDefined) {
@@ -478,7 +481,7 @@ public final class BytesToBytesMap {
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (8 byte value length) (value)
- final long requiredSize = 8 + keyLengthBytes + 8 + valueLengthBytes;
+ final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;
// --- Figure out where to insert the new record ---------------------------------------------
@@ -508,7 +511,7 @@ public final class BytesToBytesMap {
// There wasn't enough space in the current page, so write an end-of-page marker:
final Object pageBaseObject = currentDataPage.getBaseObject();
final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
- PlatformDependent.UNSAFE.putLong(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
+ PlatformDependent.UNSAFE.putInt(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
}
final long memoryGranted = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryGranted != pageSizeBytes) {
@@ -535,21 +538,22 @@ public final class BytesToBytesMap {
long insertCursor = dataPageInsertOffset;
// Compute all of our offsets up-front:
- final long keySizeOffsetInPage = insertCursor;
- insertCursor += 8; // word used to store the key size
+ final long totalLengthOffset = insertCursor;
+ insertCursor += 4;
+ final long keyLengthOffset = insertCursor;
+ insertCursor += 4;
final long keyDataOffsetInPage = insertCursor;
insertCursor += keyLengthBytes;
- final long valueSizeOffsetInPage = insertCursor;
- insertCursor += 8; // word used to store the value size
final long valueDataOffsetInPage = insertCursor;
insertCursor += valueLengthBytes; // word used to store the value size
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, totalLengthOffset,
+ keyLengthBytes + valueLengthBytes);
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
// Copy the key
- PlatformDependent.UNSAFE.putLong(dataPageBaseObject, keySizeOffsetInPage, keyLengthBytes);
PlatformDependent.copyMemory(
keyBaseObject, keyBaseOffset, dataPageBaseObject, keyDataOffsetInPage, keyLengthBytes);
// Copy the value
- PlatformDependent.UNSAFE.putLong(dataPageBaseObject, valueSizeOffsetInPage, valueLengthBytes);
PlatformDependent.copyMemory(valueBaseObject, valueBaseOffset, dataPageBaseObject,
valueDataOffsetInPage, valueLengthBytes);
@@ -557,7 +561,7 @@ public final class BytesToBytesMap {
if (useOverflowPage) {
// Store the end-of-page marker at the end of the data page
- PlatformDependent.UNSAFE.putLong(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, insertCursor, END_OF_PAGE_MARKER);
} else {
pageCursor += requiredSize;
}
@@ -565,7 +569,7 @@ public final class BytesToBytesMap {
numElements++;
bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
- dataPage, keySizeOffsetInPage);
+ dataPage, totalLengthOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 866e0b4151..c05f2c332e 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -282,6 +282,21 @@ public final class UnsafeExternalSorter {
sorter.insertRecord(recordAddress, prefix);
}
+ /**
+ * Write a record to the sorter. The record is broken down into two different parts, and
+ *
+ */
+ public void insertRecord(
+ Object recordBaseObject1,
+ long recordBaseOffset1,
+ int lengthInBytes1,
+ Object recordBaseObject2,
+ long recordBaseOffset2,
+ int lengthInBytes2,
+ long prefix) throws IOException {
+
+ }
+
public UnsafeSorterIterator getSortedIterator() throws IOException {
final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 60f483acbc..70f8ca4d21 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -243,17 +243,17 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void iteratingOverDataPagesWithWastedSpace() throws Exception {
final int NUM_ENTRIES = 1000 * 1000;
- final int KEY_LENGTH = 16;
+ final int KEY_LENGTH = 24;
final int VALUE_LENGTH = 40;
final BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, shuffleMemoryManager, NUM_ENTRIES, PAGE_SIZE_BYTES);
- // Each record will take 8 + 8 + 16 + 40 = 72 bytes of space in the data page. Our 64-megabyte
+ // Each record will take 8 + 24 + 40 = 72 bytes of space in the data page. Our 64-megabyte
// pages won't be evenly-divisible by records of this size, which will cause us to waste some
// space at the end of the page. This is necessary in order for us to take the end-of-record
// handling branch in iterator().
try {
for (int i = 0; i < NUM_ENTRIES; i++) {
- final long[] key = new long[] { i, i }; // 2 * 8 = 16 bytes
+ final long[] key = new long[] { i, i, i }; // 3 * 8 = 24 bytes
final long[] value = new long[] { i, i, i, i, i }; // 5 * 8 = 40 bytes
final BytesToBytesMap.Location loc = map.lookup(
key,