aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-01 13:20:26 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-01 13:20:26 -0700
commit3d1535d48822281953de1e8447de86fad728412a (patch)
tree52f12ad351ecc49c7e2563e0c19e6fa4c622476b /core
parentdf733cbeae7a53826e89574af5463fa018329a22 (diff)
downloadspark-3d1535d48822281953de1e8447de86fad728412a.tar.gz
spark-3d1535d48822281953de1e8447de86fad728412a.tar.bz2
spark-3d1535d48822281953de1e8447de86fad728412a.zip
[SPARK-9520] [SQL] Support in-place sort in UnsafeFixedWidthAggregationMap
This pull request adds a sortedIterator method to UnsafeFixedWidthAggregationMap that sorts its data in-place by the grouping key. This is needed so we can fallback to external sorting for aggregation. Author: Reynold Xin <rxin@databricks.com> Closes #7849 from rxin/bytes2bytes-sorting and squashes the following commits: 75018c6 [Reynold Xin] Updated documentation. 81a8694 [Reynold Xin] [SPARK-9520][SQL] Support in-place sort in UnsafeFixedWidthAggregationMap.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java41
1 files changed, 31 insertions, 10 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 481375f493..cf222b7272 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -23,6 +23,8 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import javax.annotation.Nullable;
+
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -217,6 +219,7 @@ public final class BytesToBytesMap {
private final Iterator<MemoryBlock> dataPagesIterator;
private final Location loc;
+ private MemoryBlock currentPage;
private int currentRecordNumber = 0;
private Object pageBaseObject;
private long offsetInPage;
@@ -232,7 +235,7 @@ public final class BytesToBytesMap {
}
private void advanceToNextPage() {
- final MemoryBlock currentPage = dataPagesIterator.next();
+ currentPage = dataPagesIterator.next();
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
}
@@ -249,7 +252,7 @@ public final class BytesToBytesMap {
advanceToNextPage();
totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
}
- loc.with(pageBaseObject, offsetInPage);
+ loc.with(currentPage, offsetInPage);
offsetInPage += 8 + totalLength;
currentRecordNumber++;
return loc;
@@ -346,14 +349,19 @@ public final class BytesToBytesMap {
private int keyLength;
private int valueLength;
+ /**
+ * Memory page containing the record. Only set if created by {@link BytesToBytesMap#iterator()}.
+ */
+ @Nullable private MemoryBlock memoryPage;
+
private void updateAddressesAndSizes(long fullKeyAddress) {
updateAddressesAndSizes(
taskMemoryManager.getPage(fullKeyAddress),
taskMemoryManager.getOffsetInPage(fullKeyAddress));
}
- private void updateAddressesAndSizes(final Object page, final long keyOffsetInPage) {
- long position = keyOffsetInPage;
+ private void updateAddressesAndSizes(final Object page, final long offsetInPage) {
+ long position = offsetInPage;
final int totalLength = PlatformDependent.UNSAFE.getInt(page, position);
position += 4;
keyLength = PlatformDependent.UNSAFE.getInt(page, position);
@@ -366,7 +374,7 @@ public final class BytesToBytesMap {
valueMemoryLocation.setObjAndOffset(page, position);
}
- Location with(int pos, int keyHashcode, boolean isDefined) {
+ private Location with(int pos, int keyHashcode, boolean isDefined) {
this.pos = pos;
this.isDefined = isDefined;
this.keyHashcode = keyHashcode;
@@ -377,13 +385,22 @@ public final class BytesToBytesMap {
return this;
}
- Location with(Object page, long keyOffsetInPage) {
+ private Location with(MemoryBlock page, long offsetInPage) {
this.isDefined = true;
- updateAddressesAndSizes(page, keyOffsetInPage);
+ this.memoryPage = page;
+ updateAddressesAndSizes(page.getBaseObject(), offsetInPage);
return this;
}
/**
+ * Returns the memory page that contains the current record.
+ * This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
+ */
+ public MemoryBlock getMemoryPage() {
+ return this.memoryPage;
+ }
+
+ /**
* Returns true if the key is defined at this position, and false otherwise.
*/
public boolean isDefined() {
@@ -538,7 +555,7 @@ public final class BytesToBytesMap {
long insertCursor = dataPageInsertOffset;
// Compute all of our offsets up-front:
- final long totalLengthOffset = insertCursor;
+ final long recordOffset = insertCursor;
insertCursor += 4;
final long keyLengthOffset = insertCursor;
insertCursor += 4;
@@ -547,7 +564,7 @@ public final class BytesToBytesMap {
final long valueDataOffsetInPage = insertCursor;
insertCursor += valueLengthBytes; // word used to store the value size
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, totalLengthOffset,
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, recordOffset,
keyLengthBytes + valueLengthBytes);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
// Copy the key
@@ -569,7 +586,7 @@ public final class BytesToBytesMap {
numElements++;
bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
- dataPage, totalLengthOffset);
+ dataPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
longArray.set(pos * 2 + 1, keyHashcode);
updateAddressesAndSizes(storedKeyAddress);
@@ -618,6 +635,10 @@ public final class BytesToBytesMap {
assert(dataPages.isEmpty());
}
+ public TaskMemoryManager getTaskMemoryManager() {
+ return taskMemoryManager;
+ }
+
/** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;