diff options
author | Reynold Xin <rxin@databricks.com> | 2015-08-01 13:20:26 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-08-01 13:20:26 -0700 |
commit | 3d1535d48822281953de1e8447de86fad728412a (patch) | |
tree | 52f12ad351ecc49c7e2563e0c19e6fa4c622476b /core | |
parent | df733cbeae7a53826e89574af5463fa018329a22 (diff) | |
download | spark-3d1535d48822281953de1e8447de86fad728412a.tar.gz spark-3d1535d48822281953de1e8447de86fad728412a.tar.bz2 spark-3d1535d48822281953de1e8447de86fad728412a.zip |
[SPARK-9520] [SQL] Support in-place sort in UnsafeFixedWidthAggregationMap
This pull request adds a sortedIterator method to UnsafeFixedWidthAggregationMap that sorts its data in-place by the grouping key.
This is needed so we can fallback to external sorting for aggregation.
Author: Reynold Xin <rxin@databricks.com>
Closes #7849 from rxin/bytes2bytes-sorting and squashes the following commits:
75018c6 [Reynold Xin] Updated documentation.
81a8694 [Reynold Xin] [SPARK-9520][SQL] Support in-place sort in UnsafeFixedWidthAggregationMap.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 41 |
1 files changed, 31 insertions, 10 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 481375f493..cf222b7272 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -23,6 +23,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,6 +219,7 @@ public final class BytesToBytesMap { private final Iterator<MemoryBlock> dataPagesIterator; private final Location loc; + private MemoryBlock currentPage; private int currentRecordNumber = 0; private Object pageBaseObject; private long offsetInPage; @@ -232,7 +235,7 @@ public final class BytesToBytesMap { } private void advanceToNextPage() { - final MemoryBlock currentPage = dataPagesIterator.next(); + currentPage = dataPagesIterator.next(); pageBaseObject = currentPage.getBaseObject(); offsetInPage = currentPage.getBaseOffset(); } @@ -249,7 +252,7 @@ public final class BytesToBytesMap { advanceToNextPage(); totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage); } - loc.with(pageBaseObject, offsetInPage); + loc.with(currentPage, offsetInPage); offsetInPage += 8 + totalLength; currentRecordNumber++; return loc; @@ -346,14 +349,19 @@ public final class BytesToBytesMap { private int keyLength; private int valueLength; + /** + * Memory page containing the record. Only set if created by {@link BytesToBytesMap#iterator()}. + */ + @Nullable private MemoryBlock memoryPage; + private void updateAddressesAndSizes(long fullKeyAddress) { updateAddressesAndSizes( taskMemoryManager.getPage(fullKeyAddress), taskMemoryManager.getOffsetInPage(fullKeyAddress)); } - private void updateAddressesAndSizes(final Object page, final long keyOffsetInPage) { - long position = keyOffsetInPage; + private void updateAddressesAndSizes(final Object page, final long offsetInPage) { + long position = offsetInPage; final int totalLength = PlatformDependent.UNSAFE.getInt(page, position); position += 4; keyLength = PlatformDependent.UNSAFE.getInt(page, position); @@ -366,7 +374,7 @@ public final class BytesToBytesMap { valueMemoryLocation.setObjAndOffset(page, position); } - Location with(int pos, int keyHashcode, boolean isDefined) { + private Location with(int pos, int keyHashcode, boolean isDefined) { this.pos = pos; this.isDefined = isDefined; this.keyHashcode = keyHashcode; @@ -377,13 +385,22 @@ public final class BytesToBytesMap { return this; } - Location with(Object page, long keyOffsetInPage) { + private Location with(MemoryBlock page, long offsetInPage) { this.isDefined = true; - updateAddressesAndSizes(page, keyOffsetInPage); + this.memoryPage = page; + updateAddressesAndSizes(page.getBaseObject(), offsetInPage); return this; } /** + * Returns the memory page that contains the current record. + * This is only valid if this is returned by {@link BytesToBytesMap#iterator()}. + */ + public MemoryBlock getMemoryPage() { + return this.memoryPage; + } + + /** * Returns true if the key is defined at this position, and false otherwise. */ public boolean isDefined() { @@ -538,7 +555,7 @@ public final class BytesToBytesMap { long insertCursor = dataPageInsertOffset; // Compute all of our offsets up-front: - final long totalLengthOffset = insertCursor; + final long recordOffset = insertCursor; insertCursor += 4; final long keyLengthOffset = insertCursor; insertCursor += 4; @@ -547,7 +564,7 @@ public final class BytesToBytesMap { final long valueDataOffsetInPage = insertCursor; insertCursor += valueLengthBytes; // word used to store the value size - PlatformDependent.UNSAFE.putInt(dataPageBaseObject, totalLengthOffset, + PlatformDependent.UNSAFE.putInt(dataPageBaseObject, recordOffset, keyLengthBytes + valueLengthBytes); PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes); // Copy the key @@ -569,7 +586,7 @@ public final class BytesToBytesMap { numElements++; bitset.set(pos); final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset( - dataPage, totalLengthOffset); + dataPage, recordOffset); longArray.set(pos * 2, storedKeyAddress); longArray.set(pos * 2 + 1, keyHashcode); updateAddressesAndSizes(storedKeyAddress); @@ -618,6 +635,10 @@ public final class BytesToBytesMap { assert(dataPages.isEmpty()); } + public TaskMemoryManager getTaskMemoryManager() { + return taskMemoryManager; + } + /** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */ public long getTotalMemoryConsumption() { long totalDataPagesSize = 0L; |