aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-05 18:46:52 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-05 18:46:52 -0800
commit70fe6ce52f26904aa53bd20409db69b52bccf315 (patch)
treec4669d8395ba2572c1aab3c3bd3197aadb8126d3 /core/src/main
parent0d42292f6a2dbe626e8f6a50e6c61dd79533f235 (diff)
downloadspark-70fe6ce52f26904aa53bd20409db69b52bccf315.tar.gz
spark-70fe6ce52f26904aa53bd20409db69b52bccf315.tar.bz2
spark-70fe6ce52f26904aa53bd20409db69b52bccf315.zip
[SPARK-12659] fix NPE in UnsafeExternalSorter (used by cartesian product)
Cartesian product use UnsafeExternalSorter without comparator to do spilling, it will NPE if spilling happens. This bug also hitted by #10605 cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #10606 from davies/fix_spilling.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java7
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java17
2 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 79d74b23ce..77d0b70bb8 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -400,6 +400,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
* after consuming this iterator.
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
+ assert(recordComparator != null);
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
@@ -531,18 +532,20 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
*
* It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
+ *
+ * TODO: support forced spilling
*/
public UnsafeSorterIterator getIterator() throws IOException {
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
- return inMemSorter.getIterator();
+ return inMemSorter.getSortedIterator();
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
queue.add(spillWriter.getReader(blockManager));
}
if (inMemSorter != null) {
- queue.add(inMemSorter.getIterator());
+ queue.add(inMemSorter.getSortedIterator());
}
return new ChainedIterator(queue);
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index c16cbce9a0..b7ab45675e 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -99,7 +99,11 @@ public final class UnsafeInMemorySorter {
this.consumer = consumer;
this.memoryManager = memoryManager;
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
- this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
+ if (recordComparator != null) {
+ this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
+ } else {
+ this.sortComparator = null;
+ }
this.array = array;
}
@@ -223,14 +227,9 @@ public final class UnsafeInMemorySorter {
* {@code next()} will return the same mutable object.
*/
public SortedIterator getSortedIterator() {
- sorter.sort(array, 0, pos / 2, sortComparator);
- return new SortedIterator(pos / 2);
- }
-
- /**
- * Returns an iterator over record pointers in original order (inserted).
- */
- public SortedIterator getIterator() {
+ if (sortComparator != null) {
+ sorter.sort(array, 0, pos / 2, sortComparator);
+ }
return new SortedIterator(pos / 2);
}
}