diff options
author | Davies Liu <davies@databricks.com> | 2016-01-05 18:46:52 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-01-05 18:46:52 -0800 |
commit | 70fe6ce52f26904aa53bd20409db69b52bccf315 (patch) | |
tree | c4669d8395ba2572c1aab3c3bd3197aadb8126d3 /core/src/main/java | |
parent | 0d42292f6a2dbe626e8f6a50e6c61dd79533f235 (diff) | |
download | spark-70fe6ce52f26904aa53bd20409db69b52bccf315.tar.gz spark-70fe6ce52f26904aa53bd20409db69b52bccf315.tar.bz2 spark-70fe6ce52f26904aa53bd20409db69b52bccf315.zip |
[SPARK-12659] fix NPE in UnsafeExternalSorter (used by cartesian product)
Cartesian product use UnsafeExternalSorter without comparator to do spilling, it will NPE if spilling happens.
This bug also hitted by #10605
cc JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes #10606 from davies/fix_spilling.
Diffstat (limited to 'core/src/main/java')
2 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 79d74b23ce..77d0b70bb8 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -400,6 +400,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * after consuming this iterator. */ public UnsafeSorterIterator getSortedIterator() throws IOException { + assert(recordComparator != null); if (spillWriters.isEmpty()) { assert(inMemSorter != null); readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); @@ -531,18 +532,20 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * * It is the caller's responsibility to call `cleanupResources()` * after consuming this iterator. + * + * TODO: support forced spilling */ public UnsafeSorterIterator getIterator() throws IOException { if (spillWriters.isEmpty()) { assert(inMemSorter != null); - return inMemSorter.getIterator(); + return inMemSorter.getSortedIterator(); } else { LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { queue.add(spillWriter.getReader(blockManager)); } if (inMemSorter != null) { - queue.add(inMemSorter.getIterator()); + queue.add(inMemSorter.getSortedIterator()); } return new ChainedIterator(queue); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index c16cbce9a0..b7ab45675e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -99,7 +99,11 @@ public final class UnsafeInMemorySorter { this.consumer = consumer; this.memoryManager = memoryManager; this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); - this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); + if (recordComparator != null) { + this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); + } else { + this.sortComparator = null; + } this.array = array; } @@ -223,14 +227,9 @@ public final class UnsafeInMemorySorter { * {@code next()} will return the same mutable object. */ public SortedIterator getSortedIterator() { - sorter.sort(array, 0, pos / 2, sortComparator); - return new SortedIterator(pos / 2); - } - - /** - * Returns an iterator over record pointers in original order (inserted). - */ - public SortedIterator getIterator() { + if (sortComparator != null) { + sorter.sort(array, 0, pos / 2, sortComparator); + } return new SortedIterator(pos / 2); } } |