aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-30 11:54:18 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-30 11:54:18 -0800
commit8df584b0200402d8b2ce0a8de24f7a760ced8655 (patch)
tree7cf957d90fb611d9f27867b719e169d8236783d6
parent17275fa99c670537c52843df405279a52b5c9594 (diff)
downloadspark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.gz
spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.bz2
spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.zip
[SPARK-11982] [SQL] improve performance of cartesian product
This PR improve the performance of CartesianProduct by caching the result of right plan. After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster). cc nongli Author: Davies Liu <davies@databricks.com> Closes #9969 from davies/improve_cartesian.
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java63
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala2
4 files changed, 139 insertions, 9 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 2e40312674..5a97f4f113 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
+import java.util.Queue;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -521,4 +522,66 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return upstream.getKeyPrefix();
}
}
+
+ /**
+ * Returns a iterator, which will return the rows in the order as inserted.
+ *
+ * It is the caller's responsibility to call `cleanupResources()`
+ * after consuming this iterator.
+ */
+ public UnsafeSorterIterator getIterator() throws IOException {
+ if (spillWriters.isEmpty()) {
+ assert(inMemSorter != null);
+ return inMemSorter.getIterator();
+ } else {
+ LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
+ for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
+ queue.add(spillWriter.getReader(blockManager));
+ }
+ if (inMemSorter != null) {
+ queue.add(inMemSorter.getIterator());
+ }
+ return new ChainedIterator(queue);
+ }
+ }
+
+ /**
+ * Chain multiple UnsafeSorterIterator together as single one.
+ */
+ class ChainedIterator extends UnsafeSorterIterator {
+
+ private final Queue<UnsafeSorterIterator> iterators;
+ private UnsafeSorterIterator current;
+
+ public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
+ assert iterators.size() > 0;
+ this.iterators = iterators;
+ this.current = iterators.remove();
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (!current.hasNext() && !iterators.isEmpty()) {
+ current = iterators.remove();
+ }
+ return current.hasNext();
+ }
+
+ @Override
+ public void loadNext() throws IOException {
+ current.loadNext();
+ }
+
+ @Override
+ public Object getBaseObject() { return current.getBaseObject(); }
+
+ @Override
+ public long getBaseOffset() { return current.getBaseOffset(); }
+
+ @Override
+ public int getRecordLength() { return current.getRecordLength(); }
+
+ @Override
+ public long getKeyPrefix() { return current.getKeyPrefix(); }
+ }
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index dce1f15a29..c91e88f31b 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -226,4 +226,11 @@ public final class UnsafeInMemorySorter {
sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(pos / 2);
}
+
+ /**
+ * Returns an iterator over record pointers in original order (inserted).
+ */
+ public SortedIterator getIterator() {
+ return new SortedIterator(pos / 2);
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index f467519b80..fa2bc76721 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -17,16 +17,75 @@
package org.apache.spark.sql.execution.joins
-import org.apache.spark.rdd.RDD
+import org.apache.spark._
+import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+
+/**
+ * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
+ * will be much faster than building the right partition for every row in left RDD, it also
+ * materialize the right RDD (in case of the right RDD is nondeterministic).
+ */
+private[spark]
+class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
+ extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
+ // We will not sort the rows, so prefixComparator and recordComparator are null.
+ val sorter = UnsafeExternalSorter.create(
+ context.taskMemoryManager(),
+ SparkEnv.get.blockManager,
+ context,
+ null,
+ null,
+ 1024,
+ SparkEnv.get.memoryManager.pageSizeBytes)
+
+ val partition = split.asInstanceOf[CartesianPartition]
+ for (y <- rdd2.iterator(partition.s2, context)) {
+ sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 0)
+ }
+
+ // Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
+ def createIter(): Iterator[UnsafeRow] = {
+ val iter = sorter.getIterator
+ val unsafeRow = new UnsafeRow
+ new Iterator[UnsafeRow] {
+ override def hasNext: Boolean = {
+ iter.hasNext
+ }
+ override def next(): UnsafeRow = {
+ iter.loadNext()
+ unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, numFieldsOfRight,
+ iter.getRecordLength)
+ unsafeRow
+ }
+ }
+ }
+
+ val resultIter =
+ for (x <- rdd1.iterator(partition.s1, context);
+ y <- createIter()) yield (x, y)
+ CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
+ resultIter, sorter.cleanupResources)
+ }
+}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
+ override def canProcessSafeRows: Boolean = false
+ override def canProcessUnsafeRows: Boolean = true
+ override def outputsUnsafeRows: Boolean = true
+
override private[sql] lazy val metrics = Map(
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
@@ -39,18 +98,19 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
val leftResults = left.execute().map { row =>
numLeftRows += 1
- row.copy()
+ row.asInstanceOf[UnsafeRow]
}
val rightResults = right.execute().map { row =>
numRightRows += 1
- row.copy()
+ row.asInstanceOf[UnsafeRow]
}
- leftResults.cartesian(rightResults).mapPartitionsInternal { iter =>
- val joinedRow = new JoinedRow
+ val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
+ pair.mapPartitionsInternal { iter =>
+ val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
iter.map { r =>
numOutputRows += 1
- joinedRow(r._1, r._2)
+ joiner.join(r._1, r._2)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index ebfa1eaf3e..4f2cad19bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -317,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("CartesianProduct", Map(
"number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 12L, // right is read 6 times
+ "number of right rows" -> 4L, // right is read twice
"number of output rows" -> 12L)))
)
}