aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2016-02-24 17:16:45 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-24 17:16:45 -0800
commit5a7af9e7ac85e04aa4a420bc2887207bfa18f792 (patch)
tree26537c528dcec8d989eda1f7640670fdac7feb86 /sql/core
parentcbb0b65ad53f2642fab8aad3ea115375c53b6eed (diff)
downloadspark-5a7af9e7ac85e04aa4a420bc2887207bfa18f792.tar.gz
spark-5a7af9e7ac85e04aa4a420bc2887207bfa18f792.tar.bz2
spark-5a7af9e7ac85e04aa4a420bc2887207bfa18f792.zip
[SPARK-13250] [SQL] Update PhysicallRDD to convert to UnsafeRow if using the vectorized scanner.
Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion in all cases. The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds to 6.5 seconds. Author: Nong Li <nong@databricks.com> Closes #11141 from nongli/spark-13250.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala54
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala45
5 files changed, 103 insertions, 48 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index cad7e25a32..8649d2d69b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -18,12 +18,14 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
@@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD(
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false,
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
- extends LeafNode {
+ extends LeafNode with CodegenSupport {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD(
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}
+
+ override def upstreams(): Seq[RDD[InternalRow]] = {
+ rdd :: Nil
+ }
+
+ // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+ // never requires UnsafeRow as input.
+ override protected def doProduce(ctx: CodegenContext): String = {
+ val input = ctx.freshName("input")
+ // PhysicalRDD always just has one input
+ ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+ val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
+ val row = ctx.freshName("row")
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ ctx.INPUT_ROW = row
+ ctx.currentVars = null
+ val columns = exprs.map(_.gen(ctx))
+ s"""
+ | while ($input.hasNext()) {
+ | InternalRow $row = (InternalRow) $input.next();
+ | $numOutputRows.add(1);
+ | ${columns.map(_.code).mkString("\n").trim}
+ | ${consume(ctx, columns).trim}
+ | if (shouldStop()) {
+ | return;
+ | }
+ | }
+ """.stripMargin
+ }
}
private[sql] object PhysicalRDD {
@@ -140,8 +172,13 @@ private[sql] object PhysicalRDD {
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
- // All HadoopFsRelations output UnsafeRows
- val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+ val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) {
+ // The vectorized parquet reader does not produce unsafe rows.
+ !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+ } else {
+ // All HadoopFsRelations output UnsafeRows
+ relation.isInstanceOf[HadoopFsRelation]
+ }
val bucketSpec = relation match {
case r: HadoopFsRelation => r.getBucketSpec
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 08c52e5f43..afaddcf357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan {
case _: TungstenAggregate => "agg"
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
+ case _: PhysicalRDD => "rdd"
case _ => nodeName.toLowerCase
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 7a0f7abaa1..f8a9a95c87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
- assert(parentPlan.isInstanceOf[PhysicalRDD])
- assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
+ assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
+ assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 7196b6dc13..4b578a6c10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
expectedCount: Int,
requiredColumnNames: Set[String],
expectedUnhandledFilters: Set[Filter]): Unit = {
+
test(s"PushDown Returns $expectedCount: $sqlString") {
- val queryExecution = sql(sqlString).queryExecution
- val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.PhysicalRDD => p
- } match {
- case Seq(p) => p
- case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
- }
- val rawCount = rawPlan.execute().count()
- assert(ColumnsRequired.set === requiredColumnNames)
-
- val table = caseInsensitiveContext.table("oneToTenFiltered")
- val relation = table.queryExecution.logical.collectFirst {
- case LogicalRelation(r, _, _) => r
- }.get
-
- assert(
- relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
-
- if (rawCount != expectedCount) {
- fail(
- s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
- s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
- queryExecution)
+ // These tests check a particular plan, disable whole stage codegen.
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+ try {
+ val queryExecution = sql(sqlString).queryExecution
+ val rawPlan = queryExecution.executedPlan.collect {
+ case p: execution.PhysicalRDD => p
+ } match {
+ case Seq(p) => p
+ case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+ }
+ val rawCount = rawPlan.execute().count()
+ assert(ColumnsRequired.set === requiredColumnNames)
+
+ val table = caseInsensitiveContext.table("oneToTenFiltered")
+ val relation = table.queryExecution.logical.collectFirst {
+ case LogicalRelation(r, _, _) => r
+ }.get
+
+ assert(
+ relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
+
+ if (rawCount != expectedCount) {
+ fail(
+ s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
+ s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
+ queryExecution)
+ }
+ } finally {
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index a89c5f8007..02e8ab1c51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
def testPruning(sqlString: String, expectedColumns: String*): Unit = {
test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") {
- val queryExecution = sql(sqlString).queryExecution
- val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.PhysicalRDD => p
- } match {
- case Seq(p) => p
- case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
- }
- val rawColumns = rawPlan.output.map(_.name)
- val rawOutput = rawPlan.execute().first()
-
- if (rawColumns != expectedColumns) {
- fail(
- s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
- s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
- queryExecution)
- }
- if (rawOutput.numFields != expectedColumns.size) {
- fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
+ // These tests check a particular plan, disable whole stage codegen.
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+ try {
+ val queryExecution = sql(sqlString).queryExecution
+ val rawPlan = queryExecution.executedPlan.collect {
+ case p: execution.PhysicalRDD => p
+ } match {
+ case Seq(p) => p
+ case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+ }
+ val rawColumns = rawPlan.output.map(_.name)
+ val rawOutput = rawPlan.execute().first()
+
+ if (rawColumns != expectedColumns) {
+ fail(
+ s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
+ s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
+ queryExecution)
+ }
+
+ if (rawOutput.numFields != expectedColumns.size) {
+ fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
+ }
+ } finally {
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}
-
}