aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/java
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-12-22 01:53:33 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-12-22 01:53:33 +0100
commitb41ec997786e2be42a8a2a182212a610d08b221b (patch)
tree7f60e50dedd0407d6a4a4316241228e064602723 /sql/core/src/main/java
parent83a6ace0d1be44f70e768348ae6688798c84343e (diff)
downloadspark-b41ec997786e2be42a8a2a182212a610d08b221b.tar.gz
spark-b41ec997786e2be42a8a2a182212a610d08b221b.tar.bz2
spark-b41ec997786e2be42a8a2a182212a610d08b221b.zip
[SPARK-18528][SQL] Fix a bug to initialise an iterator of aggregation buffer
## What changes were proposed in this pull request? This pr is to fix an `NullPointerException` issue caused by a following `limit + aggregate` query; ``` scala> val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value") scala> df.limit(2).groupBy("id").count().show WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ``` The root culprit is that [`$doAgg()`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L596) skips an initialization of [the buffer iterator](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L603); `BaseLimitExec` sets `stopEarly=true` and `$doAgg()` exits in the middle without the initialization. ## How was this patch tested? Added a test to check if no exception happens for limit + aggregates in `DataFrameAggregateSuite.scala`. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #15980 from maropu/SPARK-18528.
Diffstat (limited to 'sql/core/src/main/java')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java10
1 files changed, 10 insertions, 0 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
index 086547c793..730a4ae8d5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java
@@ -70,6 +70,16 @@ public abstract class BufferedRowIterator {
}
/**
+ * Returns whether this iterator should stop fetching next row from [[CodegenSupport#inputRDDs]].
+ *
+ * If it returns true, the caller should exit the loop that [[InputAdapter]] generates.
+ * This interface is mainly used to limit the number of input rows.
+ */
+ protected boolean stopEarly() {
+ return false;
+ }
+
+ /**
* Returns whether `processNext()` should stop processing next row from `input` or not.
*
* If it returns true, the caller should exit the loop (return from processNext()).