aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-30 11:54:18 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-30 11:54:18 -0800
commit8df584b0200402d8b2ce0a8de24f7a760ced8655 (patch)
tree7cf957d90fb611d9f27867b719e169d8236783d6 /core
parent17275fa99c670537c52843df405279a52b5c9594 (diff)
downloadspark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.gz
spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.bz2
spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.zip
[SPARK-11982] [SQL] improve performance of cartesian product
This PR improve the performance of CartesianProduct by caching the result of right plan. After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster). cc nongli Author: Davies Liu <davies@databricks.com> Closes #9969 from davies/improve_cartesian.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java63
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java7
2 files changed, 70 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 2e40312674..5a97f4f113 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
+import java.util.Queue;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -521,4 +522,66 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return upstream.getKeyPrefix();
}
}
+
+ /**
+ * Returns a iterator, which will return the rows in the order as inserted.
+ *
+ * It is the caller's responsibility to call `cleanupResources()`
+ * after consuming this iterator.
+ */
+ public UnsafeSorterIterator getIterator() throws IOException {
+ if (spillWriters.isEmpty()) {
+ assert(inMemSorter != null);
+ return inMemSorter.getIterator();
+ } else {
+ LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
+ for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
+ queue.add(spillWriter.getReader(blockManager));
+ }
+ if (inMemSorter != null) {
+ queue.add(inMemSorter.getIterator());
+ }
+ return new ChainedIterator(queue);
+ }
+ }
+
+ /**
+ * Chain multiple UnsafeSorterIterator together as single one.
+ */
+ class ChainedIterator extends UnsafeSorterIterator {
+
+ private final Queue<UnsafeSorterIterator> iterators;
+ private UnsafeSorterIterator current;
+
+ public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
+ assert iterators.size() > 0;
+ this.iterators = iterators;
+ this.current = iterators.remove();
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (!current.hasNext() && !iterators.isEmpty()) {
+ current = iterators.remove();
+ }
+ return current.hasNext();
+ }
+
+ @Override
+ public void loadNext() throws IOException {
+ current.loadNext();
+ }
+
+ @Override
+ public Object getBaseObject() { return current.getBaseObject(); }
+
+ @Override
+ public long getBaseOffset() { return current.getBaseOffset(); }
+
+ @Override
+ public int getRecordLength() { return current.getRecordLength(); }
+
+ @Override
+ public long getKeyPrefix() { return current.getKeyPrefix(); }
+ }
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index dce1f15a29..c91e88f31b 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -226,4 +226,11 @@ public final class UnsafeInMemorySorter {
sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(pos / 2);
}
+
+ /**
+ * Returns an iterator over record pointers in original order (inserted).
+ */
+ public SortedIterator getIterator() {
+ return new SortedIterator(pos / 2);
+ }
}