diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-02-20 13:53:23 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-02-20 13:53:23 -0800 |
commit | f88c641bc8afa836ae597801ae7520da90e8472a (patch) | |
tree | c5d15178bea7471e987ac64502b43bde40566190 | |
parent | a4a081d1df043cce6db0284ef552e5174ebb0d02 (diff) | |
download | spark-f88c641bc8afa836ae597801ae7520da90e8472a.tar.gz spark-f88c641bc8afa836ae597801ae7520da90e8472a.tar.bz2 spark-f88c641bc8afa836ae597801ae7520da90e8472a.zip |
[SPARK-13310] [SQL] Resolve Missing Sorting Columns in Generate
```scala
// case 1: missing sort columns are resolvable if join is true
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c")
// case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case
sql("SELECT explode(a) AS val FROM data order by val, c")
```
When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types.
Could you review the changes? davies cloud-fan Thanks!
Author: gatorsmile <gatorsmile@gmail.com>
Closes #11198 from gatorsmile/missingInSort.
3 files changed, 57 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8368657b2b..54b91adad1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -609,17 +609,24 @@ class Analyzer( case sa @ Sort(_, _, child: Aggregate) => sa case s @ Sort(order, _, child) if !s.resolved && child.resolved => - val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) - val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) - val missingAttrs = requiredAttrs -- child.outputSet - if (missingAttrs.nonEmpty) { - // Add missing attributes and then project them away after the sort. - Project(child.output, - Sort(newOrder, s.global, addMissingAttr(child, missingAttrs))) - } else if (newOrder != order) { - s.copy(order = newOrder) - } else { - s + try { + val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) + val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) + val missingAttrs = requiredAttrs -- child.outputSet + if (missingAttrs.nonEmpty) { + // Add missing attributes and then project them away after the sort. + Project(child.output, + Sort(newOrder, s.global, addMissingAttr(child, missingAttrs))) + } else if (newOrder != order) { + s.copy(order = newOrder) + } else { + s + } + } catch { + // Attempting to resolve it might fail. When this happens, return the original plan. + // Users will see an AnalysisException for resolution failure of missing attributes + // in Sort + case ae: AnalysisException => s } } @@ -649,6 +656,11 @@ class Analyzer( } val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs a.copy(aggregateExpressions = newAggregateExpressions) + case g: Generate => + // If join is false, we will convert it to true for getting from the child the missing + // attributes that its child might have or could have. + val missing = missingAttrs -- g.child.outputSet + g.copy(join = true, child = addMissingAttr(g.child, missing)) case u: UnaryNode => u.withNewChildren(addMissingAttr(u.child, missingAttrs) :: Nil) case other => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index e0cec09742..a46006a0c6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -194,6 +194,11 @@ class AnalysisErrorSuite extends AnalysisTest { "sort" :: "type" :: "map<int,int>" :: Nil) errorTest( + "sorting by attributes are not from grouping expressions", + testRelation2.groupBy('a, 'c)('a, 'c, count('a).as("a3")).orderBy('b.asc), + "cannot resolve" :: "'b'" :: "given input columns" :: "[a, c, a3]" :: Nil) + + errorTest( "non-boolean filters", testRelation.where(Literal(1)), "filter" :: "'1'" :: "not a boolean" :: Literal(1).dataType.simpleString :: Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index c2b0daa66b..75fa09db69 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -24,11 +24,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry} -import org.apache.spark.sql.catalyst.parser.ParserConf -import org.apache.spark.sql.execution.SparkQl import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation -import org.apache.spark.sql.hive.{HiveContext, HiveQl, MetastoreRelation} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -927,6 +926,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ).map(i => Row(i._1, i._2, i._3, i._4))) } + test("Sorting columns are not in Generate") { + withTempTable("data") { + sqlContext.range(1, 5) + .select(array($"id", $"id" + 1).as("a"), $"id".as("b"), (lit(10) - $"id").as("c")) + .registerTempTable("data") + + // case 1: missing sort columns are resolvable if join is true + checkAnswer( + sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c"), + Row(1, 1) :: Row(2, 1) :: Nil) + + // case 2: missing sort columns are resolvable if join is false + checkAnswer( + sql("SELECT explode(a) AS val FROM data order by val, c"), + Seq(1, 2, 2, 3, 3, 4, 4, 5).map(i => Row(i))) + + // case 3: missing sort columns are resolvable if join is true and outer is true + checkAnswer( + sql( + """ + |SELECT C.val, b FROM data LATERAL VIEW OUTER explode(a) C as val + |where b < 2 order by c, val, b + """.stripMargin), + Row(1, 1) :: Row(2, 1) :: Nil) + } + } + test("window function: Sorting columns are not in Project") { val data = Seq( WindowData(1, "d", 10), |