diff options
author | Nong <nong@cloudera.com> | 2015-12-04 10:01:20 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-04 10:01:20 -0800 |
commit | 95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341 (patch) | |
tree | d603e32a1b491bec6e89ee48ab87916b78f575d1 | |
parent | 17e4e021ae7fdf5e4dd05a0473faa529e3e80dbb (diff) | |
download | spark-95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341.tar.gz spark-95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341.tar.bz2 spark-95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341.zip |
[SPARK-12089] [SQL] Fix memory corrupt due to freeing a page being referenced
When the spillable sort iterator was spilled, it was mistakenly keeping
the last page in memory rather than the current page. This causes the
current record to get corrupted.
Author: Nong <nong@cloudera.com>
Closes #10142 from nongli/spark-12089.
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 5a97f4f113..79d74b23ce 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -443,6 +443,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { UnsafeInMemorySorter.SortedIterator inMemIterator = ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); + // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); while (inMemIterator.hasNext()) { @@ -458,9 +459,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer { long released = 0L; synchronized (UnsafeExternalSorter.this) { - // release the pages except the one that is used + // release the pages except the one that is used. There can still be a caller that + // is accessing the current record. We free this page in that caller's next loadNext() + // call. for (MemoryBlock page : allocatedPages) { - if (!loaded || page.getBaseObject() != inMemIterator.getBaseObject()) { + if (!loaded || page.getBaseObject() != upstream.getBaseObject()) { released += page.size(); freePage(page); } else { |