diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-07-11 16:21:13 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-07-11 16:21:13 +0800 |
commit | e22627894126dceb7491300b63f1fe028b1e2e2c (patch) | |
tree | 88789565e99e0cffb3b707e55f862f19bceb85b9 /sql/core | |
parent | 82f0874453991510216779926d795b0a4e07e854 (diff) | |
download | spark-e22627894126dceb7491300b63f1fe028b1e2e2c.tar.gz spark-e22627894126dceb7491300b63f1fe028b1e2e2c.tar.bz2 spark-e22627894126dceb7491300b63f1fe028b1e2e2c.zip |
[SPARK-16355][SPARK-16354][SQL] Fix Bugs When LIMIT/TABLESAMPLE is Non-foldable, Zero or Negative
#### What changes were proposed in this pull request?
**Issue 1:** When a query containing LIMIT/TABLESAMPLE 0, the statistics could be zero. Results are correct but it could cause a huge performance regression. For example,
```Scala
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
val df1 = spark.table("test")
val df2 = spark.table("test").limit(0)
val df = df1.join(df2, Seq("k"), "left")
```
The statistics of both `df` and `df2` are zero. The statistics values should never be zero; otherwise `sizeInBytes` of `BinaryNode` will also be zero (product of children). This PR is to increase it to `1` when the num of rows is equal to 0.
**Issue 2:** When a query containing negative LIMIT/TABLESAMPLE, we should issue exceptions. Negative values could break the implementation assumption of multiple parts. For example, statistics calculation. Below is the example query.
```SQL
SELECT * FROM testData TABLESAMPLE (-1 rows)
SELECT * FROM testData LIMIT -1
```
This PR is to issue an appropriate exception in this case.
**Issue 3:** Spark SQL follows the restriction of LIMIT clause in Hive. The argument to the LIMIT clause must evaluate to a constant value. It can be a numeric literal, or another kind of numeric expression involving operators, casts, and function return values. You cannot refer to a column or use a subquery. Currently, we do not detect whether the expression in LIMIT clause is foldable or not. If non-foldable, we might issue a strange error message. For example,
```SQL
SELECT * FROM testData LIMIT rand() > 0.2
```
Then, a misleading error message is issued, like
```
assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
+- LocalLimit (_nondeterministic#202 > 0.2)
+- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
+- LogicalRDD [key#11, value#12]
java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
+- LocalLimit (_nondeterministic#202 > 0.2)
+- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
+- LogicalRDD [key#11, value#12]
```
This PR detects it and then issues a meaningful error message.
#### How was this patch tested?
Added test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #14034 from gatorsmile/limit.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 37 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala | 44 |
2 files changed, 79 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index dca9e5e503..ede7d9a0c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -660,11 +660,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("limit") { checkAnswer( - sql("SELECT * FROM testData LIMIT 10"), + sql("SELECT * FROM testData LIMIT 9 + 1"), testData.take(10).toSeq) checkAnswer( - sql("SELECT * FROM arrayData LIMIT 1"), + sql("SELECT * FROM arrayData LIMIT CAST(1 AS Integer)"), arrayData.collect().take(1).map(Row.fromTuple).toSeq) checkAnswer( @@ -672,6 +672,39 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { mapData.collect().take(1).map(Row.fromTuple).toSeq) } + test("non-foldable expressions in LIMIT") { + val e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT key > 3") + }.getMessage + assert(e.contains("The limit expression must evaluate to a constant value, " + + "but got (testdata.`key` > 3)")) + } + + test("Expressions in limit clause are not integer") { + var e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT true") + }.getMessage + assert(e.contains("The limit expression must be integer type, but got boolean")) + + e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT 'a'") + }.getMessage + assert(e.contains("The limit expression must be integer type, but got string")) + } + + test("negative in LIMIT or TABLESAMPLE") { + val expected = "The limit expression must be equal to or greater than 0, but got -1" + var e = intercept[AnalysisException] { + sql("SELECT * FROM testData TABLESAMPLE (-1 rows)") + }.getMessage + assert(e.contains(expected)) + + e = intercept[AnalysisException] { + sql("SELECT * FROM testData LIMIT -1") + }.getMessage + assert(e.contains(expected)) + } + test("CTE feature") { checkAnswer( sql("with q1 as (select * from testData limit 10) select * from q1"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala index 4de3cf605c..ab55242ec0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ class StatisticsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ test("SPARK-15392: DataFrame created from RDD should not be broadcasted") { val rdd = sparkContext.range(1, 100).map(i => Row(i, i)) @@ -31,4 +33,46 @@ class StatisticsSuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.autoBroadcastJoinThreshold) } + test("estimates the size of limit") { + withTempTable("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") + .createOrReplaceTempView("test") + Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) => + val df = sql(s"""SELECT * FROM test limit $limit""") + + val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit => + g.statistics.sizeInBytes + } + assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizesGlobalLimit.head === BigInt(expected), + s"expected exact size $expected for table 'test', got: ${sizesGlobalLimit.head}") + + val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit => + l.statistics.sizeInBytes + } + assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizesLocalLimit.head === BigInt(expected), + s"expected exact size $expected for table 'test', got: ${sizesLocalLimit.head}") + } + } + } + + test("estimates the size of a limit 0 on outer join") { + withTempTable("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") + .createOrReplaceTempView("test") + val df1 = spark.table("test") + val df2 = spark.table("test").limit(0) + val df = df1.join(df2, Seq("k"), "left") + + val sizes = df.queryExecution.analyzed.collect { case g: Join => + g.statistics.sizeInBytes + } + + assert(sizes.size === 1, s"number of Join nodes is wrong:\n ${df.queryExecution}") + assert(sizes.head === BigInt(96), + s"expected exact size 96 for table 'test', got: ${sizes.head}") + } + } + } |