diff options
author | Davies Liu <davies@databricks.com> | 2015-11-30 11:54:18 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-11-30 11:54:18 -0800 |
commit | 8df584b0200402d8b2ce0a8de24f7a760ced8655 (patch) | |
tree | 7cf957d90fb611d9f27867b719e169d8236783d6 /core | |
parent | 17275fa99c670537c52843df405279a52b5c9594 (diff) | |
download | spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.gz spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.bz2 spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.zip |
[SPARK-11982] [SQL] improve performance of cartesian product
This PR improve the performance of CartesianProduct by caching the result of right plan.
After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster).
cc nongli
Author: Davies Liu <davies@databricks.com>
Closes #9969 from davies/improve_cartesian.
Diffstat (limited to 'core')
2 files changed, 70 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 2e40312674..5a97f4f113 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -21,6 +21,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.LinkedList; +import java.util.Queue; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -521,4 +522,66 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return upstream.getKeyPrefix(); } } + + /** + * Returns a iterator, which will return the rows in the order as inserted. + * + * It is the caller's responsibility to call `cleanupResources()` + * after consuming this iterator. + */ + public UnsafeSorterIterator getIterator() throws IOException { + if (spillWriters.isEmpty()) { + assert(inMemSorter != null); + return inMemSorter.getIterator(); + } else { + LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); + for (UnsafeSorterSpillWriter spillWriter : spillWriters) { + queue.add(spillWriter.getReader(blockManager)); + } + if (inMemSorter != null) { + queue.add(inMemSorter.getIterator()); + } + return new ChainedIterator(queue); + } + } + + /** + * Chain multiple UnsafeSorterIterator together as single one. + */ + class ChainedIterator extends UnsafeSorterIterator { + + private final Queue<UnsafeSorterIterator> iterators; + private UnsafeSorterIterator current; + + public ChainedIterator(Queue<UnsafeSorterIterator> iterators) { + assert iterators.size() > 0; + this.iterators = iterators; + this.current = iterators.remove(); + } + + @Override + public boolean hasNext() { + while (!current.hasNext() && !iterators.isEmpty()) { + current = iterators.remove(); + } + return current.hasNext(); + } + + @Override + public void loadNext() throws IOException { + current.loadNext(); + } + + @Override + public Object getBaseObject() { return current.getBaseObject(); } + + @Override + public long getBaseOffset() { return current.getBaseOffset(); } + + @Override + public int getRecordLength() { return current.getRecordLength(); } + + @Override + public long getKeyPrefix() { return current.getKeyPrefix(); } + } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index dce1f15a29..c91e88f31b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -226,4 +226,11 @@ public final class UnsafeInMemorySorter { sorter.sort(array, 0, pos / 2, sortComparator); return new SortedIterator(pos / 2); } + + /** + * Returns an iterator over record pointers in original order (inserted). + */ + public SortedIterator getIterator() { + return new SortedIterator(pos / 2); + } } |