aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-02-12 17:32:15 -0800
committerReynold Xin <rxin@databricks.com>2016-02-12 17:32:15 -0800
commit2228f074e1bc11ee452925e10f780eaf24faf9e5 (patch)
treee853814c29a58d8502e8c292ce126a248ed9c1f6 /sql
parent62b1c07e7e88fe9c951c59cf022dfd52f160cfeb (diff)
downloadspark-2228f074e1bc11ee452925e10f780eaf24faf9e5.tar.gz
spark-2228f074e1bc11ee452925e10f780eaf24faf9e5.tar.bz2
spark-2228f074e1bc11ee452925e10f780eaf24faf9e5.zip
[SPARK-13293][SQL] generate Expand
Expand suffer from create the UnsafeRow from same input multiple times, with codegen, it only need to copy some of the columns. After this, we can see 3X improvements (from 43 seconds to 13 seconds) on a TPCDS query (Q67) that have eight columns in Rollup. Ideally, we could mask some of the columns based on bitmask, I'd leave that in the future, because currently Aggregation (50 ns) is much slower than that just copy the variables (1-2 ns). Author: Davies Liu <davies@databricks.com> Closes #11177 from davies/gen_expand.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala124
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala17
2 files changed, 140 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index c3683cc4e7..d26a0b7467 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -17,11 +17,15 @@
package org.apache.spark.sql.execution
+import scala.collection.immutable.IndexedSeq
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* Apply the all of the GroupExpressions to every input row, hence we will get
@@ -35,7 +39,10 @@ case class Expand(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)
- extends UnaryNode {
+ extends UnaryNode with CodegenSupport {
+
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
// The GroupExpressions can output data with arbitrary partitioning, so set it
// as UNKNOWN partitioning
@@ -48,6 +55,8 @@ case class Expand(
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
+ val numOutputRows = longMetric("numOutputRows")
+
child.execute().mapPartitions { iter =>
val groups = projections.map(projection).toArray
new Iterator[InternalRow] {
@@ -71,9 +80,122 @@ case class Expand(
idx = 0
}
+ numOutputRows += 1
result
}
}
}
}
+
+ override def upstream(): RDD[InternalRow] = {
+ child.asInstanceOf[CodegenSupport].upstream()
+ }
+
+ protected override def doProduce(ctx: CodegenContext): String = {
+ child.asInstanceOf[CodegenSupport].produce(ctx, this)
+ }
+
+ override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+ /*
+ * When the projections list looks like:
+ * expr1A, exprB, expr1C
+ * expr2A, exprB, expr2C
+ * ...
+ * expr(N-1)A, exprB, expr(N-1)C
+ *
+ * i.e. column A and C have different values for each output row, but column B stays constant.
+ *
+ * The generated code looks something like (note that B is only computed once in declaration):
+ *
+ * // part 1: declare all the columns
+ * colA = ...
+ * colB = ...
+ * colC = ...
+ *
+ * // part 2: code that computes the columns
+ * for (row = 0; row < N; row++) {
+ * switch (row) {
+ * case 0:
+ * colA = ...
+ * colC = ...
+ * case 1:
+ * colA = ...
+ * colC = ...
+ * ...
+ * case N - 1:
+ * colA = ...
+ * colC = ...
+ * }
+ * // increment metrics and consume output values
+ * }
+ *
+ * We use a for loop here so we only includes one copy of the consume code and avoid code
+ * size explosion.
+ */
+
+ // Set input variables
+ ctx.currentVars = input
+
+ // Tracks whether a column has the same output for all rows.
+ // Size of sameOutput array should equal N.
+ // If sameOutput(i) is true, then the i-th column has the same value for all output rows given
+ // an input row.
+ val sameOutput: Array[Boolean] = output.indices.map { colIndex =>
+ projections.map(p => p(colIndex)).toSet.size == 1
+ }.toArray
+
+ // Part 1: declare variables for each column
+ // If a column has the same value for all output rows, then we also generate its computation
+ // right after declaration. Otherwise its value is computed in the part 2.
+ val outputColumns = output.indices.map { col =>
+ val firstExpr = projections.head(col)
+ if (sameOutput(col)) {
+ // This column is the same across all output rows. Just generate code for it here.
+ BindReferences.bindReference(firstExpr, child.output).gen(ctx)
+ } else {
+ val isNull = ctx.freshName("isNull")
+ val value = ctx.freshName("value")
+ val code = s"""
+ |boolean $isNull = true;
+ |${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)};
+ """.stripMargin
+ ExprCode(code, isNull, value)
+ }
+ }
+
+ // Part 2: switch/case statements
+ val cases = projections.zipWithIndex.map { case (exprs, row) =>
+ var updateCode = ""
+ for (col <- exprs.indices) {
+ if (!sameOutput(col)) {
+ val ev = BindReferences.bindReference(exprs(col), child.output).gen(ctx)
+ updateCode +=
+ s"""
+ |${ev.code}
+ |${outputColumns(col).isNull} = ${ev.isNull};
+ |${outputColumns(col).value} = ${ev.value};
+ """.stripMargin
+ }
+ }
+
+ s"""
+ |case $row:
+ | ${updateCode.trim}
+ | break;
+ """.stripMargin
+ }
+
+ val numOutput = metricTerm(ctx, "numOutputRows")
+ val i = ctx.freshName("i")
+ s"""
+ |${outputColumns.map(_.code).mkString("\n").trim}
+ |for (int $i = 0; $i < ${projections.length}; $i ++) {
+ | switch ($i) {
+ | ${cases.mkString("\n").trim}
+ | }
+ | $numOutput.add(1);
+ | ${consume(ctx, outputColumns)}
+ |}
+ """.stripMargin
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 1c7e69f30f..4a151179bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -157,6 +157,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
}
+ ignore("rube") {
+ val N = 5 << 20
+
+ runBenchmark("cube", N) {
+ sqlContext.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
+ .cube("k1", "k2").sum("id").collect()
+ }
+
+ /**
+ Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ cube codegen=false 3188 / 3392 1.6 608.2 1.0X
+ cube codegen=true 1239 / 1394 4.2 236.3 2.6X
+ */
+ }
+
ignore("hash and BytesToBytesMap") {
val N = 50 << 20