aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2016-02-24 17:16:45 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-24 17:16:45 -0800
commit5a7af9e7ac85e04aa4a420bc2887207bfa18f792 (patch)
tree26537c528dcec8d989eda1f7640670fdac7feb86 /sql
parentcbb0b65ad53f2642fab8aad3ea115375c53b6eed (diff)
downloadspark-5a7af9e7ac85e04aa4a420bc2887207bfa18f792.tar.gz
spark-5a7af9e7ac85e04aa4a420bc2887207bfa18f792.tar.bz2
spark-5a7af9e7ac85e04aa4a420bc2887207bfa18f792.zip
[SPARK-13250] [SQL] Update PhysicallRDD to convert to UnsafeRow if using the vectorized scanner.
Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion in all cases. The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds to 6.5 seconds. Author: Nong Li <nong@databricks.com> Closes #11141 from nongli/spark-13250.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala54
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala45
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala46
6 files changed, 127 insertions, 70 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index cad7e25a32..8649d2d69b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -18,12 +18,14 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
@@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD(
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false,
override val outputPartitioning: Partitioning = UnknownPartitioning(0))
- extends LeafNode {
+ extends LeafNode with CodegenSupport {
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
@@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD(
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}
+
+ override def upstreams(): Seq[RDD[InternalRow]] = {
+ rdd :: Nil
+ }
+
+ // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
+ // never requires UnsafeRow as input.
+ override protected def doProduce(ctx: CodegenContext): String = {
+ val input = ctx.freshName("input")
+ // PhysicalRDD always just has one input
+ ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+ val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
+ val row = ctx.freshName("row")
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ ctx.INPUT_ROW = row
+ ctx.currentVars = null
+ val columns = exprs.map(_.gen(ctx))
+ s"""
+ | while ($input.hasNext()) {
+ | InternalRow $row = (InternalRow) $input.next();
+ | $numOutputRows.add(1);
+ | ${columns.map(_.code).mkString("\n").trim}
+ | ${consume(ctx, columns).trim}
+ | if (shouldStop()) {
+ | return;
+ | }
+ | }
+ """.stripMargin
+ }
}
private[sql] object PhysicalRDD {
@@ -140,8 +172,13 @@ private[sql] object PhysicalRDD {
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
- // All HadoopFsRelations output UnsafeRows
- val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+ val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) {
+ // The vectorized parquet reader does not produce unsafe rows.
+ !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+ } else {
+ // All HadoopFsRelations output UnsafeRows
+ relation.isInstanceOf[HadoopFsRelation]
+ }
val bucketSpec = relation match {
case r: HadoopFsRelation => r.getBucketSpec
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 08c52e5f43..afaddcf357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan {
case _: TungstenAggregate => "agg"
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
+ case _: PhysicalRDD => "rdd"
case _ => nodeName.toLowerCase
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 7a0f7abaa1..f8a9a95c87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite
val parentPlan = df.queryExecution.executedPlan
// Check if SparkPlan Filter is removed in a physical plan and
// the plan only has PhysicalRDD to scan JDBCRelation.
- assert(parentPlan.isInstanceOf[PhysicalRDD])
- assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
+ assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
+ assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 7196b6dc13..4b578a6c10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
expectedCount: Int,
requiredColumnNames: Set[String],
expectedUnhandledFilters: Set[Filter]): Unit = {
+
test(s"PushDown Returns $expectedCount: $sqlString") {
- val queryExecution = sql(sqlString).queryExecution
- val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.PhysicalRDD => p
- } match {
- case Seq(p) => p
- case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
- }
- val rawCount = rawPlan.execute().count()
- assert(ColumnsRequired.set === requiredColumnNames)
-
- val table = caseInsensitiveContext.table("oneToTenFiltered")
- val relation = table.queryExecution.logical.collectFirst {
- case LogicalRelation(r, _, _) => r
- }.get
-
- assert(
- relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
-
- if (rawCount != expectedCount) {
- fail(
- s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
- s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
- queryExecution)
+ // These tests check a particular plan, disable whole stage codegen.
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+ try {
+ val queryExecution = sql(sqlString).queryExecution
+ val rawPlan = queryExecution.executedPlan.collect {
+ case p: execution.PhysicalRDD => p
+ } match {
+ case Seq(p) => p
+ case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+ }
+ val rawCount = rawPlan.execute().count()
+ assert(ColumnsRequired.set === requiredColumnNames)
+
+ val table = caseInsensitiveContext.table("oneToTenFiltered")
+ val relation = table.queryExecution.logical.collectFirst {
+ case LogicalRelation(r, _, _) => r
+ }.get
+
+ assert(
+ relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
+
+ if (rawCount != expectedCount) {
+ fail(
+ s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" +
+ s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
+ queryExecution)
+ }
+ } finally {
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index a89c5f8007..02e8ab1c51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
def testPruning(sqlString: String, expectedColumns: String*): Unit = {
test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") {
- val queryExecution = sql(sqlString).queryExecution
- val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.PhysicalRDD => p
- } match {
- case Seq(p) => p
- case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
- }
- val rawColumns = rawPlan.output.map(_.name)
- val rawOutput = rawPlan.execute().first()
-
- if (rawColumns != expectedColumns) {
- fail(
- s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
- s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
- queryExecution)
- }
- if (rawOutput.numFields != expectedColumns.size) {
- fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
+ // These tests check a particular plan, disable whole stage codegen.
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
+ try {
+ val queryExecution = sql(sqlString).queryExecution
+ val rawPlan = queryExecution.executedPlan.collect {
+ case p: execution.PhysicalRDD => p
+ } match {
+ case Seq(p) => p
+ case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+ }
+ val rawColumns = rawPlan.output.map(_.name)
+ val rawOutput = rawPlan.execute().first()
+
+ if (rawColumns != expectedColumns) {
+ fail(
+ s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" +
+ s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" +
+ queryExecution)
+ }
+
+ if (rawOutput.numFields != expectedColumns.size) {
+ fail(s"Wrong output row. Got $rawOutput\n$queryExecution")
+ }
+ } finally {
+ caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED,
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get)
}
}
}
-
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 0ef42f45e3..62f6b998e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketValues: Seq[Integer],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
+ // This test verifies parts of the plan. Disable whole stage codegen.
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
+ val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
+ // Limit: bucket pruning only works when the bucket column has one and only one column
+ assert(bucketColumnNames.length == 1)
+ val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
+ val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
+ val matchedBuckets = new BitSet(numBuckets)
+ bucketValues.foreach { value =>
+ matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
+ }
- val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
- val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
- // Limit: bucket pruning only works when the bucket column has one and only one column
- assert(bucketColumnNames.length == 1)
- val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
- val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
- val matchedBuckets = new BitSet(numBuckets)
- bucketValues.foreach { value =>
- matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
- }
+ // Filter could hide the bug in bucket pruning. Thus, skipping all the filters
+ val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
+ val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
+ assert(rdd.isDefined, plan)
- // Filter could hide the bug in bucket pruning. Thus, skipping all the filters
- val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
- .find(_.isInstanceOf[PhysicalRDD])
- assert(rdd.isDefined)
+ val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
+ if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
+ }
+ // checking if all the pruned buckets are empty
+ assert(checkedResult.collect().forall(_ == true))
- val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
- if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
+ checkAnswer(
+ bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
+ originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}
- // checking if all the pruned buckets are empty
- assert(checkedResult.collect().forall(_ == true))
-
- checkAnswer(
- bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
- originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
}
test("read partitioning bucketed tables with bucket pruning filters") {