From eec74ba8bde7f9446cc38e687bda103e85669d35 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 5 Nov 2015 19:02:18 -0800 Subject: [SPARK-7542][SQL] Support off-heap index/sort buffer This brings the support of off-heap memory for array inside BytesToBytesMap and InMemorySorter, then we could allocate all the memory from off-heap for execution. Closes #8068 Author: Davies Liu Closes #9477 from davies/unsafe_timsort. --- .../org/apache/spark/memory/MemoryConsumer.java | 36 ++++++------ .../org/apache/spark/memory/TaskMemoryManager.java | 6 +- .../spark/shuffle/sort/ShuffleExternalSorter.java | 26 +++------ .../spark/shuffle/sort/ShuffleInMemorySorter.java | 67 +++++++++++----------- .../spark/shuffle/sort/ShuffleSortDataFormat.java | 38 +++++++----- .../apache/spark/unsafe/map/BytesToBytesMap.java | 18 +++--- .../unsafe/sort/UnsafeExternalSorter.java | 28 ++++----- .../unsafe/sort/UnsafeInMemorySorter.java | 66 ++++++++++++--------- .../unsafe/sort/UnsafeSortDataFormat.java | 47 +++++++++------ .../spark/memory/TaskMemoryManagerSuite.java | 23 -------- .../apache/spark/memory/TestMemoryConsumer.java | 45 +++++++++++++++ .../shuffle/sort/ShuffleInMemorySorterSuite.java | 16 ++++-- .../unsafe/sort/UnsafeExternalSorterSuite.java | 1 - .../unsafe/sort/UnsafeInMemorySorterSuite.java | 12 ++-- 14 files changed, 242 insertions(+), 187 deletions(-) create mode 100644 core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java (limited to 'core') diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 008799cc77..8fbdb72832 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -20,6 +20,7 @@ package org.apache.spark.memory; import java.io.IOException; +import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; @@ -28,9 +29,9 @@ import org.apache.spark.unsafe.memory.MemoryBlock; */ public abstract class MemoryConsumer { - private final TaskMemoryManager taskMemoryManager; + protected final TaskMemoryManager taskMemoryManager; private final long pageSize; - private long used; + protected long used; protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) { this.taskMemoryManager = taskMemoryManager; @@ -74,26 +75,29 @@ public abstract class MemoryConsumer { public abstract long spill(long size, MemoryConsumer trigger) throws IOException; /** - * Acquire `size` bytes memory. - * - * If there is not enough memory, throws OutOfMemoryError. + * Allocates a LongArray of `size`. */ - protected void acquireMemory(long size) { - long got = taskMemoryManager.acquireExecutionMemory(size, this); - if (got < size) { - taskMemoryManager.releaseExecutionMemory(got, this); + public LongArray allocateArray(long size) { + long required = size * 8L; + MemoryBlock page = taskMemoryManager.allocatePage(required, this); + if (page == null || page.size() < required) { + long got = 0; + if (page != null) { + got = page.size(); + taskMemoryManager.freePage(page, this); + } taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Could not acquire " + size + " bytes of memory, got " + got); + throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); } - used += got; + used += required; + return new LongArray(page); } /** - * Release `size` bytes memory. + * Frees a LongArray. */ - protected void releaseMemory(long size) { - used -= size; - taskMemoryManager.releaseExecutionMemory(size, this); + public void freeArray(LongArray array) { + freePage(array.memoryBlock()); } /** @@ -109,7 +113,7 @@ public abstract class MemoryConsumer { long got = 0; if (page != null) { got = page.size(); - freePage(page); + taskMemoryManager.freePage(page, this); } taskMemoryManager.showMemoryUsage(); throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 4230575446..6440f9c0f3 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -137,7 +137,7 @@ public class TaskMemoryManager { if (got < required) { // Call spill() on other consumers to release memory for (MemoryConsumer c: consumers) { - if (c != null && c != consumer && c.getUsed() > 0) { + if (c != consumer && c.getUsed() > 0) { try { long released = c.spill(required - got, consumer); if (released > 0) { @@ -173,7 +173,9 @@ public class TaskMemoryManager { } } - consumers.add(consumer); + if (consumer != null) { + consumers.add(consumer); + } logger.debug("Task {} acquire {} for {}", taskAttemptId, Utils.bytesToString(got), consumer); return got; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 400d852001..9affff8014 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -39,6 +39,7 @@ import org.apache.spark.storage.BlockManager; import org.apache.spark.storage.DiskBlockObjectWriter; import org.apache.spark.storage.TempShuffleBlockId; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.Utils; @@ -114,8 +115,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { this.numElementsForSpillThreshold = conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE); this.writeMetrics = writeMetrics; - acquireMemory(initialSize * 8L); - this.inMemSorter = new ShuffleInMemorySorter(initialSize); + this.inMemSorter = new ShuffleInMemorySorter(this, initialSize); this.peakMemoryUsedBytes = getMemoryUsage(); } @@ -301,9 +301,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { public void cleanupResources() { freeMemory(); if (inMemSorter != null) { - long sorterMemoryUsage = inMemSorter.getMemoryUsage(); + inMemSorter.free(); inMemSorter = null; - releaseMemory(sorterMemoryUsage); } for (SpillInfo spill : spills) { if (spill.file.exists() && !spill.file.delete()) { @@ -321,9 +320,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { assert(inMemSorter != null); if (!inMemSorter.hasSpaceForAnotherRecord()) { long used = inMemSorter.getMemoryUsage(); - long needed = used + inMemSorter.getMemoryToExpand(); + LongArray array; try { - acquireMemory(needed); // could trigger spilling + // could trigger spilling + array = allocateArray(used / 8 * 2); } catch (OutOfMemoryError e) { // should have trigger spilling assert(inMemSorter.hasSpaceForAnotherRecord()); @@ -331,16 +331,9 @@ final class ShuffleExternalSorter extends MemoryConsumer { } // check if spilling is triggered or not if (inMemSorter.hasSpaceForAnotherRecord()) { - releaseMemory(needed); + freeArray(array); } else { - try { - inMemSorter.expandPointerArray(); - releaseMemory(used); - } catch (OutOfMemoryError oom) { - // Just in case that JVM had run out of memory - releaseMemory(needed); - spill(); - } + inMemSorter.expandPointerArray(array); } } } @@ -404,9 +397,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { // Do not count the final file towards the spill count. writeSortedFile(true); freeMemory(); - long sorterMemoryUsage = inMemSorter.getMemoryUsage(); + inMemSorter.free(); inMemSorter = null; - releaseMemory(sorterMemoryUsage); } return spills.toArray(new SpillInfo[spills.size()]); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index e630575d1a..58ad88e1ed 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -19,11 +19,14 @@ package org.apache.spark.shuffle.sort; import java.util.Comparator; +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.util.collection.Sorter; final class ShuffleInMemorySorter { - private final Sorter sorter; + private final Sorter sorter; private static final class SortComparator implements Comparator { @Override public int compare(PackedRecordPointer left, PackedRecordPointer right) { @@ -32,24 +35,34 @@ final class ShuffleInMemorySorter { } private static final SortComparator SORT_COMPARATOR = new SortComparator(); + private final MemoryConsumer consumer; + /** * An array of record pointers and partition ids that have been encoded by * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating * records. */ - private long[] array; + private LongArray array; /** * The position in the pointer array where new records can be inserted. */ private int pos = 0; - public ShuffleInMemorySorter(int initialSize) { + public ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) { + this.consumer = consumer; assert (initialSize > 0); - this.array = new long[initialSize]; + this.array = consumer.allocateArray(initialSize); this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE); } + public void free() { + if (array != null) { + consumer.freeArray(array); + array = null; + } + } + public int numRecords() { return pos; } @@ -58,30 +71,25 @@ final class ShuffleInMemorySorter { pos = 0; } - private int newLength() { - // Guard against overflow: - return array.length <= Integer.MAX_VALUE / 2 ? (array.length * 2) : Integer.MAX_VALUE; - } - - /** - * Returns the memory needed to expand - */ - public long getMemoryToExpand() { - return ((long) (newLength() - array.length)) * 8; - } - - public void expandPointerArray() { - final long[] oldArray = array; - array = new long[newLength()]; - System.arraycopy(oldArray, 0, array, 0, oldArray.length); + public void expandPointerArray(LongArray newArray) { + assert(newArray.size() > array.size()); + Platform.copyMemory( + array.getBaseObject(), + array.getBaseOffset(), + newArray.getBaseObject(), + newArray.getBaseOffset(), + array.size() * 8L + ); + consumer.freeArray(array); + array = newArray; } public boolean hasSpaceForAnotherRecord() { - return pos < array.length; + return pos < array.size(); } public long getMemoryUsage() { - return array.length * 8L; + return array.size() * 8L; } /** @@ -96,14 +104,9 @@ final class ShuffleInMemorySorter { */ public void insertRecord(long recordPointer, int partitionId) { if (!hasSpaceForAnotherRecord()) { - if (array.length == Integer.MAX_VALUE) { - throw new IllegalStateException("Sort pointer array has reached maximum size"); - } else { - expandPointerArray(); - } + expandPointerArray(consumer.allocateArray(array.size() * 2)); } - array[pos] = - PackedRecordPointer.packPointer(recordPointer, partitionId); + array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++; } @@ -112,12 +115,12 @@ final class ShuffleInMemorySorter { */ public static final class ShuffleSorterIterator { - private final long[] pointerArray; + private final LongArray pointerArray; private final int numRecords; final PackedRecordPointer packedRecordPointer = new PackedRecordPointer(); private int position = 0; - public ShuffleSorterIterator(int numRecords, long[] pointerArray) { + public ShuffleSorterIterator(int numRecords, LongArray pointerArray) { this.numRecords = numRecords; this.pointerArray = pointerArray; } @@ -127,7 +130,7 @@ final class ShuffleInMemorySorter { } public void loadNext() { - packedRecordPointer.set(pointerArray[position]); + packedRecordPointer.set(pointerArray.get(position)); position++; } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 8a1e5aec6f..8f4e322997 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -17,16 +17,19 @@ package org.apache.spark.shuffle.sort; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; -final class ShuffleSortDataFormat extends SortDataFormat { +final class ShuffleSortDataFormat extends SortDataFormat { public static final ShuffleSortDataFormat INSTANCE = new ShuffleSortDataFormat(); private ShuffleSortDataFormat() { } @Override - public PackedRecordPointer getKey(long[] data, int pos) { + public PackedRecordPointer getKey(LongArray data, int pos) { // Since we re-use keys, this method shouldn't be called. throw new UnsupportedOperationException(); } @@ -37,31 +40,38 @@ final class ShuffleSortDataFormat extends SortDataFormat= 0); - // The capacity needs to be divisible by 64 so that our bit set can be sized properly capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64); assert (capacity <= MAX_CAPACITY); - acquireMemory(capacity * 16); - longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2])); + longArray = allocateArray(capacity * 2); + longArray.zeroOut(); this.growthThreshold = (int) (capacity * loadFactor); this.mask = capacity - 1; @@ -743,9 +741,8 @@ public final class BytesToBytesMap extends MemoryConsumer { public void free() { updatePeakMemoryUsed(); if (longArray != null) { - long used = longArray.memoryBlock().size(); + freeArray(longArray); longArray = null; - releaseMemory(used); } Iterator dataPagesIterator = dataPages.iterator(); while (dataPagesIterator.hasNext()) { @@ -834,9 +831,9 @@ public final class BytesToBytesMap extends MemoryConsumer { /** * Returns the underline long[] of longArray. */ - public long[] getArray() { + public LongArray getArray() { assert(longArray != null); - return (long[]) longArray.memoryBlock().getBaseObject(); + return longArray; } /** @@ -844,7 +841,8 @@ public final class BytesToBytesMap extends MemoryConsumer { */ public void reset() { numElements = 0; - Arrays.fill(getArray(), 0); + longArray.zeroOut(); + while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); freePage(dataPage); @@ -887,7 +885,7 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(newPos * 2, keyPointer); longArray.set(newPos * 2 + 1, hashcode); } - releaseMemory(oldLongArray.memoryBlock().size()); + freeArray(oldLongArray); if (enablePerfMetrics) { timeSpentResizingNs += System.nanoTime() - resizeStartTime; diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index cba043bc48..9a7b2ad06c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -32,6 +32,7 @@ import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.TaskCompletionListener; import org.apache.spark.util.Utils; @@ -123,9 +124,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.writeMetrics = new ShuffleWriteMetrics(); if (existingInMemorySorter == null) { - this.inMemSorter = - new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize); - acquireMemory(inMemSorter.getMemoryUsage()); + this.inMemSorter = new UnsafeInMemorySorter( + this, taskMemoryManager, recordComparator, prefixComparator, initialSize); } else { this.inMemSorter = existingInMemorySorter; } @@ -277,9 +277,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer { deleteSpillFiles(); freeMemory(); if (inMemSorter != null) { - long used = inMemSorter.getMemoryUsage(); + inMemSorter.free(); inMemSorter = null; - releaseMemory(used); } } } @@ -293,9 +292,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { assert(inMemSorter != null); if (!inMemSorter.hasSpaceForAnotherRecord()) { long used = inMemSorter.getMemoryUsage(); - long needed = used + inMemSorter.getMemoryToExpand(); + LongArray array; try { - acquireMemory(needed); // could trigger spilling + // could trigger spilling + array = allocateArray(used / 8 * 2); } catch (OutOfMemoryError e) { // should have trigger spilling assert(inMemSorter.hasSpaceForAnotherRecord()); @@ -303,16 +303,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } // check if spilling is triggered or not if (inMemSorter.hasSpaceForAnotherRecord()) { - releaseMemory(needed); + freeArray(array); } else { - try { - inMemSorter.expandPointerArray(); - releaseMemory(used); - } catch (OutOfMemoryError oom) { - // Just in case that JVM had run out of memory - releaseMemory(needed); - spill(); - } + inMemSorter.expandPointerArray(array); } } } @@ -498,9 +491,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer { nextUpstream = null; assert(inMemSorter != null); - long used = inMemSorter.getMemoryUsage(); + inMemSorter.free(); inMemSorter = null; - releaseMemory(used); } numRecords--; upstream.loadNext(); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index d57213b9b8..a218ad4623 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -19,8 +19,10 @@ package org.apache.spark.util.collection.unsafe.sort; import java.util.Comparator; +import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.util.collection.Sorter; /** @@ -62,15 +64,16 @@ public final class UnsafeInMemorySorter { } } + private final MemoryConsumer consumer; private final TaskMemoryManager memoryManager; - private final Sorter sorter; + private final Sorter sorter; private final Comparator sortComparator; /** * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. */ - private long[] array; + private LongArray array; /** * The position in the sort buffer where new records can be inserted. @@ -78,22 +81,33 @@ public final class UnsafeInMemorySorter { private int pos = 0; public UnsafeInMemorySorter( + final MemoryConsumer consumer, final TaskMemoryManager memoryManager, final RecordComparator recordComparator, final PrefixComparator prefixComparator, int initialSize) { - this(memoryManager, recordComparator, prefixComparator, new long[initialSize * 2]); + this(consumer, memoryManager, recordComparator, prefixComparator, + consumer.allocateArray(initialSize * 2)); } public UnsafeInMemorySorter( + final MemoryConsumer consumer, final TaskMemoryManager memoryManager, final RecordComparator recordComparator, final PrefixComparator prefixComparator, - long[] array) { - this.array = array; + LongArray array) { + this.consumer = consumer; this.memoryManager = memoryManager; this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); + this.array = array; + } + + /** + * Free the memory used by pointer array. + */ + public void free() { + consumer.freeArray(array); } public void reset() { @@ -107,26 +121,26 @@ public final class UnsafeInMemorySorter { return pos / 2; } - private int newLength() { - return array.length < Integer.MAX_VALUE / 2 ? (array.length * 2) : Integer.MAX_VALUE; - } - - public long getMemoryToExpand() { - return (long) (newLength() - array.length) * 8L; - } - public long getMemoryUsage() { - return array.length * 8L; + return array.size() * 8L; } public boolean hasSpaceForAnotherRecord() { - return pos + 2 <= array.length; + return pos + 2 <= array.size(); } - public void expandPointerArray() { - final long[] oldArray = array; - array = new long[newLength()]; - System.arraycopy(oldArray, 0, array, 0, oldArray.length); + public void expandPointerArray(LongArray newArray) { + if (newArray.size() < array.size()) { + throw new OutOfMemoryError("Not enough memory to grow pointer array"); + } + Platform.copyMemory( + array.getBaseObject(), + array.getBaseOffset(), + newArray.getBaseObject(), + newArray.getBaseOffset(), + array.size() * 8L); + consumer.freeArray(array); + array = newArray; } /** @@ -138,11 +152,11 @@ public final class UnsafeInMemorySorter { */ public void insertRecord(long recordPointer, long keyPrefix) { if (!hasSpaceForAnotherRecord()) { - expandPointerArray(); + expandPointerArray(consumer.allocateArray(array.size() * 2)); } - array[pos] = recordPointer; + array.set(pos, recordPointer); pos++; - array[pos] = keyPrefix; + array.set(pos, keyPrefix); pos++; } @@ -150,7 +164,7 @@ public final class UnsafeInMemorySorter { private final TaskMemoryManager memoryManager; private final int sortBufferInsertPosition; - private final long[] sortBuffer; + private final LongArray sortBuffer; private int position = 0; private Object baseObject; private long baseOffset; @@ -160,7 +174,7 @@ public final class UnsafeInMemorySorter { private SortedIterator( TaskMemoryManager memoryManager, int sortBufferInsertPosition, - long[] sortBuffer) { + LongArray sortBuffer) { this.memoryManager = memoryManager; this.sortBufferInsertPosition = sortBufferInsertPosition; this.sortBuffer = sortBuffer; @@ -188,11 +202,11 @@ public final class UnsafeInMemorySorter { @Override public void loadNext() { // This pointer points to a 4-byte record length, followed by the record's bytes - final long recordPointer = sortBuffer[position]; + final long recordPointer = sortBuffer.get(position); baseObject = memoryManager.getPage(recordPointer); baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length recordLength = Platform.getInt(baseObject, baseOffset - 4); - keyPrefix = sortBuffer[position + 1]; + keyPrefix = sortBuffer.get(position + 1); position += 2; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index d09c728a7a..d3137f5f31 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -17,6 +17,9 @@ package org.apache.spark.util.collection.unsafe.sort; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; /** @@ -26,14 +29,14 @@ import org.apache.spark.util.collection.SortDataFormat; * Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. */ -final class UnsafeSortDataFormat extends SortDataFormat { +final class UnsafeSortDataFormat extends SortDataFormat { public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat(); private UnsafeSortDataFormat() { } @Override - public RecordPointerAndKeyPrefix getKey(long[] data, int pos) { + public RecordPointerAndKeyPrefix getKey(LongArray data, int pos) { // Since we re-use keys, this method shouldn't be called. throw new UnsupportedOperationException(); } @@ -44,37 +47,43 @@ final class UnsafeSortDataFormat extends SortDataFormat