aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-07-11 16:21:13 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-11 16:21:13 +0800
commite22627894126dceb7491300b63f1fe028b1e2e2c (patch)
tree88789565e99e0cffb3b707e55f862f19bceb85b9 /sql/catalyst
parent82f0874453991510216779926d795b0a4e07e854 (diff)
downloadspark-e22627894126dceb7491300b63f1fe028b1e2e2c.tar.gz
spark-e22627894126dceb7491300b63f1fe028b1e2e2c.tar.bz2
spark-e22627894126dceb7491300b63f1fe028b1e2e2c.zip
[SPARK-16355][SPARK-16354][SQL] Fix Bugs When LIMIT/TABLESAMPLE is Non-foldable, Zero or Negative
#### What changes were proposed in this pull request? **Issue 1:** When a query containing LIMIT/TABLESAMPLE 0, the statistics could be zero. Results are correct but it could cause a huge performance regression. For example, ```Scala Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") .createOrReplaceTempView("test") val df1 = spark.table("test") val df2 = spark.table("test").limit(0) val df = df1.join(df2, Seq("k"), "left") ``` The statistics of both `df` and `df2` are zero. The statistics values should never be zero; otherwise `sizeInBytes` of `BinaryNode` will also be zero (product of children). This PR is to increase it to `1` when the num of rows is equal to 0. **Issue 2:** When a query containing negative LIMIT/TABLESAMPLE, we should issue exceptions. Negative values could break the implementation assumption of multiple parts. For example, statistics calculation. Below is the example query. ```SQL SELECT * FROM testData TABLESAMPLE (-1 rows) SELECT * FROM testData LIMIT -1 ``` This PR is to issue an appropriate exception in this case. **Issue 3:** Spark SQL follows the restriction of LIMIT clause in Hive. The argument to the LIMIT clause must evaluate to a constant value. It can be a numeric literal, or another kind of numeric expression involving operators, casts, and function return values. You cannot refer to a column or use a subquery. Currently, we do not detect whether the expression in LIMIT clause is foldable or not. If non-foldable, we might issue a strange error message. For example, ```SQL SELECT * FROM testData LIMIT rand() > 0.2 ``` Then, a misleading error message is issued, like ``` assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2) +- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203] +- LocalLimit (_nondeterministic#202 > 0.2) +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202] +- LogicalRDD [key#11, value#12] java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2) +- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203] +- LocalLimit (_nondeterministic#202 > 0.2) +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202] +- LogicalRDD [key#11, value#12] ``` This PR detects it and then issues a meaningful error message. #### How was this patch tested? Added test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #14034 from gatorsmile/limit.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala19
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala6
3 files changed, 39 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7b30fcc6c5..8b87a4e41c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -46,6 +46,21 @@ trait CheckAnalysis extends PredicateHelper {
}).length > 1
}
+ private def checkLimitClause(limitExpr: Expression): Unit = {
+ limitExpr match {
+ case e if !e.foldable => failAnalysis(
+ "The limit expression must evaluate to a constant value, but got " +
+ limitExpr.sql)
+ case e if e.dataType != IntegerType => failAnalysis(
+ s"The limit expression must be integer type, but got " +
+ e.dataType.simpleString)
+ case e if e.eval().asInstanceOf[Int] < 0 => failAnalysis(
+ "The limit expression must be equal to or greater than 0, but got " +
+ e.eval().asInstanceOf[Int])
+ case e => // OK
+ }
+ }
+
def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
@@ -251,6 +266,10 @@ trait CheckAnalysis extends PredicateHelper {
s"but one table has '${firstError.output.length}' columns and another table has " +
s"'${s.children.head.output.length}' columns")
+ case GlobalLimit(limitExpr, _) => checkLimitClause(limitExpr)
+
+ case LocalLimit(limitExpr, _) => checkLimitClause(limitExpr)
+
case p if p.expressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
p match {
case _: Filter | _: Aggregate | _: Project => // Ok
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 79f9a210a3..c0e400f617 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -660,7 +660,13 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
- val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+ val sizeInBytes = if (limit == 0) {
+ // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
+ // (product of children).
+ 1
+ } else {
+ (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+ }
child.statistics.copy(sizeInBytes = sizeInBytes)
}
}
@@ -675,7 +681,13 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
- val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+ val sizeInBytes = if (limit == 0) {
+ // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
+ // (product of children).
+ 1
+ } else {
+ (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+ }
child.statistics.copy(sizeInBytes = sizeInBytes)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a9cde1e19e..ff112c5169 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -353,6 +353,12 @@ class AnalysisErrorSuite extends AnalysisTest {
)
errorTest(
+ "num_rows in limit clause must be equal to or greater than 0",
+ listRelation.limit(-1),
+ "The limit expression must be equal to or greater than 0, but got -1" :: Nil
+ )
+
+ errorTest(
"more than one generators in SELECT",
listRelation.select(Explode('list), Explode('list)),
"Only one generator allowed per select clause but found 2: explode(list), explode(list)" :: Nil