aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-30 11:54:18 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-30 11:54:18 -0800
commit8df584b0200402d8b2ce0a8de24f7a760ced8655 (patch)
tree7cf957d90fb611d9f27867b719e169d8236783d6 /sql
parent17275fa99c670537c52843df405279a52b5c9594 (diff)
downloadspark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.gz
spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.tar.bz2
spark-8df584b0200402d8b2ce0a8de24f7a760ced8655.zip
[SPARK-11982] [SQL] improve performance of cartesian product
This PR improve the performance of CartesianProduct by caching the result of right plan. After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster). cc nongli Author: Davies Liu <davies@databricks.com> Closes #9969 from davies/improve_cartesian.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala2
2 files changed, 69 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index f467519b80..fa2bc76721 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -17,16 +17,75 @@
package org.apache.spark.sql.execution.joins
-import org.apache.spark.rdd.RDD
+import org.apache.spark._
+import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+
+/**
+ * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
+ * will be much faster than building the right partition for every row in left RDD, it also
+ * materialize the right RDD (in case of the right RDD is nondeterministic).
+ */
+private[spark]
+class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
+ extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
+ // We will not sort the rows, so prefixComparator and recordComparator are null.
+ val sorter = UnsafeExternalSorter.create(
+ context.taskMemoryManager(),
+ SparkEnv.get.blockManager,
+ context,
+ null,
+ null,
+ 1024,
+ SparkEnv.get.memoryManager.pageSizeBytes)
+
+ val partition = split.asInstanceOf[CartesianPartition]
+ for (y <- rdd2.iterator(partition.s2, context)) {
+ sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 0)
+ }
+
+ // Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
+ def createIter(): Iterator[UnsafeRow] = {
+ val iter = sorter.getIterator
+ val unsafeRow = new UnsafeRow
+ new Iterator[UnsafeRow] {
+ override def hasNext: Boolean = {
+ iter.hasNext
+ }
+ override def next(): UnsafeRow = {
+ iter.loadNext()
+ unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, numFieldsOfRight,
+ iter.getRecordLength)
+ unsafeRow
+ }
+ }
+ }
+
+ val resultIter =
+ for (x <- rdd1.iterator(partition.s1, context);
+ y <- createIter()) yield (x, y)
+ CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
+ resultIter, sorter.cleanupResources)
+ }
+}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
+ override def canProcessSafeRows: Boolean = false
+ override def canProcessUnsafeRows: Boolean = true
+ override def outputsUnsafeRows: Boolean = true
+
override private[sql] lazy val metrics = Map(
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
@@ -39,18 +98,19 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
val leftResults = left.execute().map { row =>
numLeftRows += 1
- row.copy()
+ row.asInstanceOf[UnsafeRow]
}
val rightResults = right.execute().map { row =>
numRightRows += 1
- row.copy()
+ row.asInstanceOf[UnsafeRow]
}
- leftResults.cartesian(rightResults).mapPartitionsInternal { iter =>
- val joinedRow = new JoinedRow
+ val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
+ pair.mapPartitionsInternal { iter =>
+ val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
iter.map { r =>
numOutputRows += 1
- joinedRow(r._1, r._2)
+ joiner.join(r._1, r._2)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index ebfa1eaf3e..4f2cad19bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -317,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
testSparkPlanMetrics(df, 1, Map(
1L -> ("CartesianProduct", Map(
"number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 12L, // right is read 6 times
+ "number of right rows" -> 4L, // right is read twice
"number of output rows" -> 12L)))
)
}