diff options
author | Davies Liu <davies@databricks.com> | 2016-03-23 11:58:43 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-03-23 11:58:43 -0700 |
commit | 02d9c352c72a16725322678ef174c5c6e9f2c617 (patch) | |
tree | c39e12b0cd237378104da98d8f89a003179fd3bc | |
parent | 30bdb5cbd9aec191cf15cdc83c3fee375c04c2b2 (diff) | |
download | spark-02d9c352c72a16725322678ef174c5c6e9f2c617.tar.gz spark-02d9c352c72a16725322678ef174c5c6e9f2c617.tar.bz2 spark-02d9c352c72a16725322678ef174c5c6e9f2c617.zip |
[SPARK-14092] [SQL] move shouldStop() to end of while loop
## What changes were proposed in this pull request?
This PR rollback some changes in #11274 , which introduced some performance regression when do a simple aggregation on parquet scan with one integer column.
Does not really understand how this change introduce this huge impact, maybe related show JIT compiler inline functions. (saw very different stats from profiling).
## How was this patch tested?
Manually run the parquet reader benchmark, before this change:
```
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 2391 / 3107 43.9 22.8 1.0X
```
After this change
```
Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
Intel(R) Core(TM) i7-4558U CPU 2.80GHz
Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 2032 / 2626 51.6 19.4 1.0X```
Author: Davies Liu <davies@databricks.com>
Closes #11912 from davies/fix_regression.
3 files changed, 12 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b4348d39c2..3e2c799762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -255,11 +255,12 @@ private[sql] case class DataSourceScan( | $numOutputRows.add(numRows); | } | - | while (!shouldStop() && $idx < numRows) { + | // this loop is very perf sensitive and changes to it should be measured carefully + | while ($idx < numRows) { | int $rowidx = $idx++; | ${consume(ctx, columns1).trim} + | if (shouldStop()) return; | } - | if (shouldStop()) return; | | if (!$input.hasNext()) { | $batch = null; @@ -280,7 +281,7 @@ private[sql] case class DataSourceScan( s""" | private void $scanRows(InternalRow $row) throws java.io.IOException { | boolean firstRow = true; - | while (!shouldStop() && (firstRow || $input.hasNext())) { + | while (firstRow || $input.hasNext()) { | if (firstRow) { | firstRow = false; | } else { @@ -288,6 +289,7 @@ private[sql] case class DataSourceScan( | } | $numOutputRows.add(1); | ${consume(ctx, columns2, inputRow).trim} + | if (shouldStop()) return; | } | }""".stripMargin) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 5634e5fc58..0be0b8032a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -103,11 +103,12 @@ trait CodegenSupport extends SparkPlan { * # call child.produce() * initialized = true; * } - * while (!shouldStop() && hashmap.hasNext()) { + * while (hashmap.hasNext()) { * row = hashmap.next(); * # build the aggregation results * # create variables for results * # call consume(), which will call parent.doConsume() + * if (shouldStop()) return; * } */ protected def doProduce(ctx: CodegenContext): String @@ -251,9 +252,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport ctx.currentVars = null val columns = exprs.map(_.gen(ctx)) s""" - | while (!shouldStop() && $input.hasNext()) { + | while ($input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); | ${consume(ctx, columns, row).trim} + | if (shouldStop()) return; | } """.stripMargin } @@ -320,7 +322,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup /** Codegened pipeline for: * ${toCommentSafeString(child.treeString.trim)} */ - class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { + final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { private Object[] references; ${ctx.declareMutableStates()} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 6e2a5aa4f9..ee3f1d70e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -282,13 +282,14 @@ case class Range( | } | } | - | while (!$overflow && $checkEnd && !shouldStop()) { + | while (!$overflow && $checkEnd) { | long $value = $number; | $number += ${step}L; | if ($number < $value ^ ${step}L < 0) { | $overflow = true; | } | ${consume(ctx, Seq(ev))} + | if (shouldStop()) return; | } """.stripMargin } |