aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala3
3 files changed, 12 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index b4348d39c2..3e2c799762 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -255,11 +255,12 @@ private[sql] case class DataSourceScan(
| $numOutputRows.add(numRows);
| }
|
- | while (!shouldStop() && $idx < numRows) {
+ | // this loop is very perf sensitive and changes to it should be measured carefully
+ | while ($idx < numRows) {
| int $rowidx = $idx++;
| ${consume(ctx, columns1).trim}
+ | if (shouldStop()) return;
| }
- | if (shouldStop()) return;
|
| if (!$input.hasNext()) {
| $batch = null;
@@ -280,7 +281,7 @@ private[sql] case class DataSourceScan(
s"""
| private void $scanRows(InternalRow $row) throws java.io.IOException {
| boolean firstRow = true;
- | while (!shouldStop() && (firstRow || $input.hasNext())) {
+ | while (firstRow || $input.hasNext()) {
| if (firstRow) {
| firstRow = false;
| } else {
@@ -288,6 +289,7 @@ private[sql] case class DataSourceScan(
| }
| $numOutputRows.add(1);
| ${consume(ctx, columns2, inputRow).trim}
+ | if (shouldStop()) return;
| }
| }""".stripMargin)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 5634e5fc58..0be0b8032a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -103,11 +103,12 @@ trait CodegenSupport extends SparkPlan {
* # call child.produce()
* initialized = true;
* }
- * while (!shouldStop() && hashmap.hasNext()) {
+ * while (hashmap.hasNext()) {
* row = hashmap.next();
* # build the aggregation results
* # create variables for results
* # call consume(), which will call parent.doConsume()
+ * if (shouldStop()) return;
* }
*/
protected def doProduce(ctx: CodegenContext): String
@@ -251,9 +252,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
ctx.currentVars = null
val columns = exprs.map(_.gen(ctx))
s"""
- | while (!shouldStop() && $input.hasNext()) {
+ | while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, columns, row).trim}
+ | if (shouldStop()) return;
| }
""".stripMargin
}
@@ -320,7 +322,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
/** Codegened pipeline for:
* ${toCommentSafeString(child.treeString.trim)}
*/
- class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
+ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
${ctx.declareMutableStates()}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 6e2a5aa4f9..ee3f1d70e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -282,13 +282,14 @@ case class Range(
| }
| }
|
- | while (!$overflow && $checkEnd && !shouldStop()) {
+ | while (!$overflow && $checkEnd) {
| long $value = $number;
| $number += ${step}L;
| if ($number < $value ^ ${step}L < 0) {
| $overflow = true;
| }
| ${consume(ctx, Seq(ev))}
+ | if (shouldStop()) return;
| }
""".stripMargin
}