aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java7
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java19
2 files changed, 25 insertions, 1 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4d6731ee60..80b03d7e99 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -150,6 +150,11 @@ public final class UnsafeExternalSorter {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
}
+ @VisibleForTesting
+ public int getNumberOfAllocatedPages() {
+ return allocatedPages.size();
+ }
+
public long freeMemory() {
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
@@ -257,7 +262,7 @@ public final class UnsafeExternalSorter {
currentPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;
-
+ freeSpaceInCurrentPage -= totalSpaceRequired;
sorter.insertRecord(recordAddress, prefix);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index ea8755e21e..0e391b7512 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -199,4 +199,23 @@ public class UnsafeExternalSorterSuite {
}
}
+ @Test
+ public void testFillingPage() throws Exception {
+ final UnsafeExternalSorter sorter = new UnsafeExternalSorter(
+ memoryManager,
+ shuffleMemoryManager,
+ blockManager,
+ taskContext,
+ recordComparator,
+ prefixComparator,
+ 1024,
+ new SparkConf());
+
+ byte[] record = new byte[16];
+ while (sorter.getNumberOfAllocatedPages() < 2) {
+ sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0);
+ }
+ sorter.freeMemory();
+ }
+
}