diff options
3 files changed, 31 insertions, 4 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 4d6731ee60..80b03d7e99 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -150,6 +150,11 @@ public final class UnsafeExternalSorter { return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE); } + @VisibleForTesting + public int getNumberOfAllocatedPages() { + return allocatedPages.size(); + } + public long freeMemory() { long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { @@ -257,7 +262,7 @@ public final class UnsafeExternalSorter { currentPagePosition, lengthInBytes); currentPagePosition += lengthInBytes; - + freeSpaceInCurrentPage -= totalSpaceRequired; sorter.insertRecord(recordAddress, prefix); } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index ea8755e21e..0e391b7512 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -199,4 +199,23 @@ public class UnsafeExternalSorterSuite { } } + @Test + public void testFillingPage() throws Exception { + final UnsafeExternalSorter sorter = new UnsafeExternalSorter( + memoryManager, + shuffleMemoryManager, + blockManager, + taskContext, + recordComparator, + prefixComparator, + 1024, + new SparkConf()); + + byte[] record = new byte[16]; + while (sorter.getNumberOfAllocatedPages() < 2) { + sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0); + } + sorter.freeMemory(); + } + } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index be4ff400c4..4c3f2c6557 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -124,7 +124,7 @@ final class UnsafeExternalRowSorter { return new AbstractScalaRowIterator() { private final int numFields = schema.length(); - private final UnsafeRow row = new UnsafeRow(); + private UnsafeRow row = new UnsafeRow(); @Override public boolean hasNext() { @@ -141,10 +141,13 @@ final class UnsafeExternalRowSorter { numFields, sortedIterator.getRecordLength()); if (!hasNext()) { - row.copy(); // so that we don't have dangling pointers to freed page + UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page + row = null; // so that we don't keep references to the base object cleanupResources(); + return copy; + } else { + return row; } - return row; } catch (IOException e) { cleanupResources(); // Scala iterators don't declare any checked exceptions, so we need to use this hack |