aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-04 14:42:11 -0700
committerReynold Xin <rxin@databricks.com>2015-08-04 14:42:11 -0700
commitab8ee1a3b93286a62949569615086ef5030e9fae (patch)
tree88aa364451320f2a303bbeeb4857bcba57896c84 /core/src/main/java
parentf4b1ac08a1327e6d0ddc317cdf3997a0f68dec72 (diff)
downloadspark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.gz
spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.bz2
spark-ab8ee1a3b93286a62949569615086ef5030e9fae.zip
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExternalSorter
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages. Author: Josh Rosen <joshrosen@databricks.com> Closes #7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits: 967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter 948c344 [Josh Rosen] Add large records tests for KV sorter. 3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method 380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter 27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite a49baef [Josh Rosen] Address initial round of review comments 3edb931 [Josh Rosen] Remove accidentally-committed debug statements. 2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java173
1 files changed, 121 insertions, 52 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index dec7fcfa0d..e6ddd08e5f 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -34,6 +34,7 @@ import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.BlockManager;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
@@ -143,8 +144,7 @@ public final class UnsafeExternalSorter {
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
- deleteSpillFiles();
- freeMemory();
+ cleanupResources();
return null;
}
});
@@ -249,7 +249,7 @@ public final class UnsafeExternalSorter {
*
* @return the number of bytes freed.
*/
- public long freeMemory() {
+ private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
@@ -275,44 +275,32 @@ public final class UnsafeExternalSorter {
/**
* Deletes any spill files created by this sorter.
*/
- public void deleteSpillFiles() {
+ private void deleteSpillFiles() {
for (UnsafeSorterSpillWriter spill : spillWriters) {
File file = spill.getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
- };
+ }
}
}
}
/**
- * Checks whether there is enough space to insert a new record into the sorter.
- *
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
- * the record size.
-
- * @return true if the record can be inserted without requiring more allocations, false otherwise.
+ * Frees this sorter's in-memory data structures and cleans up its spill files.
*/
- private boolean haveSpaceForRecord(int requiredSpace) {
- assert(requiredSpace > 0);
- assert(inMemSorter != null);
- return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
+ public void cleanupResources() {
+ deleteSpillFiles();
+ freeMemory();
}
/**
- * Allocates more memory in order to insert an additional record. This will request additional
- * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
- * obtained.
- *
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
- * the record size.
+ * Checks whether there is enough space to insert an additional record in to the sort pointer
+ * array and grows the array if additional space is required. If the required space cannot be
+ * obtained, then the in-memory data will be spilled to disk.
*/
- private void allocateSpaceForRecord(int requiredSpace) throws IOException {
+ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
- // TODO: merge these steps to first calculate total memory requirements for this insert,
- // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
- // data page.
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
@@ -326,7 +314,20 @@ public final class UnsafeExternalSorter {
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
}
+ }
+ /**
+ * Allocates more memory in order to insert an additional record. This will request additional
+ * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
+ * obtained.
+ *
+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
+ * the record size. This must be less than or equal to the page size (records
+ * that exceed the page size are handled via a different code path which uses
+ * special overflow pages).
+ */
+ private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
+ assert (requiredSpace <= pageSizeBytes);
if (requiredSpace > freeSpaceInCurrentPage) {
logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
freeSpaceInCurrentPage);
@@ -339,9 +340,7 @@ public final class UnsafeExternalSorter {
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
- if (memoryAcquired > 0) {
- shuffleMemoryManager.release(memoryAcquired);
- }
+ shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
@@ -365,26 +364,59 @@ public final class UnsafeExternalSorter {
long recordBaseOffset,
int lengthInBytes,
long prefix) throws IOException {
+
+ growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int totalSpaceRequired = lengthInBytes + 4;
- if (!haveSpaceForRecord(totalSpaceRequired)) {
- allocateSpaceForRecord(totalSpaceRequired);
+
+ // --- Figure out where to insert the new record ----------------------------------------------
+
+ final MemoryBlock dataPage;
+ long dataPagePosition;
+ boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
+ if (useOverflowPage) {
+ long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
+ // The record is larger than the page size, so allocate a special overflow page just to hold
+ // that record.
+ final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGranted != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGranted);
+ spill();
+ final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGrantedAfterSpill != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGrantedAfterSpill);
+ throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
+ }
+ }
+ MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
+ allocatedPages.add(overflowPage);
+ dataPage = overflowPage;
+ dataPagePosition = overflowPage.getBaseOffset();
+ } else {
+ // The record is small enough to fit in a regular data page, but the current page might not
+ // have enough space to hold it (or no pages have been allocated yet).
+ acquireNewPageIfNecessary(totalSpaceRequired);
+ dataPage = currentPage;
+ dataPagePosition = currentPagePosition;
+ // Update bookkeeping information
+ freeSpaceInCurrentPage -= totalSpaceRequired;
+ currentPagePosition += totalSpaceRequired;
}
- assert(inMemSorter != null);
+ final Object dataPageBaseObject = dataPage.getBaseObject();
+
+ // --- Insert the record ----------------------------------------------------------------------
final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
- final Object dataPageBaseObject = currentPage.getBaseObject();
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
- currentPagePosition += 4;
+ taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
+ dataPagePosition += 4;
PlatformDependent.copyMemory(
recordBaseObject,
recordBaseOffset,
dataPageBaseObject,
- currentPagePosition,
+ dataPagePosition,
lengthInBytes);
- currentPagePosition += lengthInBytes;
- freeSpaceInCurrentPage -= totalSpaceRequired;
+ assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}
@@ -399,33 +431,70 @@ public final class UnsafeExternalSorter {
public void insertKVRecord(
Object keyBaseObj, long keyOffset, int keyLen,
Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException {
+
+ growPointerArrayIfNecessary();
final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
- if (!haveSpaceForRecord(totalSpaceRequired)) {
- allocateSpaceForRecord(totalSpaceRequired);
+
+ // --- Figure out where to insert the new record ----------------------------------------------
+
+ final MemoryBlock dataPage;
+ long dataPagePosition;
+ boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
+ if (useOverflowPage) {
+ long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
+ // The record is larger than the page size, so allocate a special overflow page just to hold
+ // that record.
+ final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGranted != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGranted);
+ spill();
+ final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGrantedAfterSpill != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGrantedAfterSpill);
+ throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
+ }
+ }
+ MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
+ allocatedPages.add(overflowPage);
+ dataPage = overflowPage;
+ dataPagePosition = overflowPage.getBaseOffset();
+ } else {
+ // The record is small enough to fit in a regular data page, but the current page might not
+ // have enough space to hold it (or no pages have been allocated yet).
+ acquireNewPageIfNecessary(totalSpaceRequired);
+ dataPage = currentPage;
+ dataPagePosition = currentPagePosition;
+ // Update bookkeeping information
+ freeSpaceInCurrentPage -= totalSpaceRequired;
+ currentPagePosition += totalSpaceRequired;
}
- assert(inMemSorter != null);
+ final Object dataPageBaseObject = dataPage.getBaseObject();
+
+ // --- Insert the record ----------------------------------------------------------------------
final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
- final Object dataPageBaseObject = currentPage.getBaseObject();
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4);
- currentPagePosition += 4;
+ taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4);
+ dataPagePosition += 4;
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen);
- currentPagePosition += 4;
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen);
+ dataPagePosition += 4;
PlatformDependent.copyMemory(
- keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen);
- currentPagePosition += keyLen;
+ keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen);
+ dataPagePosition += keyLen;
PlatformDependent.copyMemory(
- valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen);
- currentPagePosition += valueLen;
+ valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen);
- freeSpaceInCurrentPage -= totalSpaceRequired;
+ assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}
+ /**
+ * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
+ * after consuming this iterator.
+ */
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();