diff options
3 files changed, 12 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b4348d39c2..3e2c799762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -255,11 +255,12 @@ private[sql] case class DataSourceScan( | $numOutputRows.add(numRows); | } | - | while (!shouldStop() && $idx < numRows) { + | // this loop is very perf sensitive and changes to it should be measured carefully + | while ($idx < numRows) { | int $rowidx = $idx++; | ${consume(ctx, columns1).trim} + | if (shouldStop()) return; | } - | if (shouldStop()) return; | | if (!$input.hasNext()) { | $batch = null; @@ -280,7 +281,7 @@ private[sql] case class DataSourceScan( s""" | private void $scanRows(InternalRow $row) throws java.io.IOException { | boolean firstRow = true; - | while (!shouldStop() && (firstRow || $input.hasNext())) { + | while (firstRow || $input.hasNext()) { | if (firstRow) { | firstRow = false; | } else { @@ -288,6 +289,7 @@ private[sql] case class DataSourceScan( | } | $numOutputRows.add(1); | ${consume(ctx, columns2, inputRow).trim} + | if (shouldStop()) return; | } | }""".stripMargin) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 5634e5fc58..0be0b8032a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -103,11 +103,12 @@ trait CodegenSupport extends SparkPlan { * # call child.produce() * initialized = true; * } - * while (!shouldStop() && hashmap.hasNext()) { + * while (hashmap.hasNext()) { * row = hashmap.next(); * # build the aggregation results * # create variables for results * # call consume(), which will call parent.doConsume() + * if (shouldStop()) return; * } */ protected def doProduce(ctx: CodegenContext): String @@ -251,9 +252,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport ctx.currentVars = null val columns = exprs.map(_.gen(ctx)) s""" - | while (!shouldStop() && $input.hasNext()) { + | while ($input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); | ${consume(ctx, columns, row).trim} + | if (shouldStop()) return; | } """.stripMargin } @@ -320,7 +322,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup /** Codegened pipeline for: * ${toCommentSafeString(child.treeString.trim)} */ - class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { + final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { private Object[] references; ${ctx.declareMutableStates()} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 6e2a5aa4f9..ee3f1d70e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -282,13 +282,14 @@ case class Range( | } | } | - | while (!$overflow && $checkEnd && !shouldStop()) { + | while (!$overflow && $checkEnd) { | long $value = $number; | $number += ${step}L; | if ($number < $value ^ ${step}L < 0) { | $overflow = true; | } | ${consume(ctx, Seq(ev))} + | if (shouldStop()) return; | } """.stripMargin } |