aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java7
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java19
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java9
3 files changed, 31 insertions, 4 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4d6731ee60..80b03d7e99 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -150,6 +150,11 @@ public final class UnsafeExternalSorter {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
}
+ @VisibleForTesting
+ public int getNumberOfAllocatedPages() {
+ return allocatedPages.size();
+ }
+
public long freeMemory() {
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
@@ -257,7 +262,7 @@ public final class UnsafeExternalSorter {
currentPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;
-
+ freeSpaceInCurrentPage -= totalSpaceRequired;
sorter.insertRecord(recordAddress, prefix);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index ea8755e21e..0e391b7512 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -199,4 +199,23 @@ public class UnsafeExternalSorterSuite {
}
}
+ @Test
+ public void testFillingPage() throws Exception {
+ final UnsafeExternalSorter sorter = new UnsafeExternalSorter(
+ memoryManager,
+ shuffleMemoryManager,
+ blockManager,
+ taskContext,
+ recordComparator,
+ prefixComparator,
+ 1024,
+ new SparkConf());
+
+ byte[] record = new byte[16];
+ while (sorter.getNumberOfAllocatedPages() < 2) {
+ sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0);
+ }
+ sorter.freeMemory();
+ }
+
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index be4ff400c4..4c3f2c6557 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -124,7 +124,7 @@ final class UnsafeExternalRowSorter {
return new AbstractScalaRowIterator() {
private final int numFields = schema.length();
- private final UnsafeRow row = new UnsafeRow();
+ private UnsafeRow row = new UnsafeRow();
@Override
public boolean hasNext() {
@@ -141,10 +141,13 @@ final class UnsafeExternalRowSorter {
numFields,
sortedIterator.getRecordLength());
if (!hasNext()) {
- row.copy(); // so that we don't have dangling pointers to freed page
+ UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page
+ row = null; // so that we don't keep references to the base object
cleanupResources();
+ return copy;
+ } else {
+ return row;
}
- return row;
} catch (IOException e) {
cleanupResources();
// Scala iterators don't declare any checked exceptions, so we need to use this hack