aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNong <nong@cloudera.com>2015-12-04 10:01:20 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-04 10:01:20 -0800
commit95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341 (patch)
treed603e32a1b491bec6e89ee48ab87916b78f575d1 /core
parent17e4e021ae7fdf5e4dd05a0473faa529e3e80dbb (diff)
downloadspark-95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341.tar.gz
spark-95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341.tar.bz2
spark-95296d9b1ad1d9e9396d7dfd0015ef27ce1cf341.zip
[SPARK-12089] [SQL] Fix memory corrupt due to freeing a page being referenced
When the spillable sort iterator was spilled, it was mistakenly keeping the last page in memory rather than the current page. This causes the current record to get corrupted. Author: Nong <nong@cloudera.com> Closes #10142 from nongli/spark-12089.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java7
1 files changed, 5 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 5a97f4f113..79d74b23ce 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -443,6 +443,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
UnsafeInMemorySorter.SortedIterator inMemIterator =
((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+ // Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
while (inMemIterator.hasNext()) {
@@ -458,9 +459,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
long released = 0L;
synchronized (UnsafeExternalSorter.this) {
- // release the pages except the one that is used
+ // release the pages except the one that is used. There can still be a caller that
+ // is accessing the current record. We free this page in that caller's next loadNext()
+ // call.
for (MemoryBlock page : allocatedPages) {
- if (!loaded || page.getBaseObject() != inMemIterator.getBaseObject()) {
+ if (!loaded || page.getBaseObject() != upstream.getBaseObject()) {
released += page.size();
freePage(page);
} else {