aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-02-20 13:53:23 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-20 13:53:23 -0800
commitf88c641bc8afa836ae597801ae7520da90e8472a (patch)
treec5d15178bea7471e987ac64502b43bde40566190 /sql
parenta4a081d1df043cce6db0284ef552e5174ebb0d02 (diff)
downloadspark-f88c641bc8afa836ae597801ae7520da90e8472a.tar.gz
spark-f88c641bc8afa836ae597801ae7520da90e8472a.tar.bz2
spark-f88c641bc8afa836ae597801ae7520da90e8472a.zip
[SPARK-13310] [SQL] Resolve Missing Sorting Columns in Generate
```scala // case 1: missing sort columns are resolvable if join is true sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c") // case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case sql("SELECT explode(a) AS val FROM data order by val, c") ``` When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types. Could you review the changes? davies cloud-fan Thanks! Author: gatorsmile <gatorsmile@gmail.com> Closes #11198 from gatorsmile/missingInSort.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala34
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala32
3 files changed, 57 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8368657b2b..54b91adad1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -609,17 +609,24 @@ class Analyzer(
case sa @ Sort(_, _, child: Aggregate) => sa
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
- val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
- val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
- val missingAttrs = requiredAttrs -- child.outputSet
- if (missingAttrs.nonEmpty) {
- // Add missing attributes and then project them away after the sort.
- Project(child.output,
- Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
- } else if (newOrder != order) {
- s.copy(order = newOrder)
- } else {
- s
+ try {
+ val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
+ val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
+ val missingAttrs = requiredAttrs -- child.outputSet
+ if (missingAttrs.nonEmpty) {
+ // Add missing attributes and then project them away after the sort.
+ Project(child.output,
+ Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
+ } else if (newOrder != order) {
+ s.copy(order = newOrder)
+ } else {
+ s
+ }
+ } catch {
+ // Attempting to resolve it might fail. When this happens, return the original plan.
+ // Users will see an AnalysisException for resolution failure of missing attributes
+ // in Sort
+ case ae: AnalysisException => s
}
}
@@ -649,6 +656,11 @@ class Analyzer(
}
val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs
a.copy(aggregateExpressions = newAggregateExpressions)
+ case g: Generate =>
+ // If join is false, we will convert it to true for getting from the child the missing
+ // attributes that its child might have or could have.
+ val missing = missingAttrs -- g.child.outputSet
+ g.copy(join = true, child = addMissingAttr(g.child, missing))
case u: UnaryNode =>
u.withNewChildren(addMissingAttr(u.child, missingAttrs) :: Nil)
case other =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index e0cec09742..a46006a0c6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -194,6 +194,11 @@ class AnalysisErrorSuite extends AnalysisTest {
"sort" :: "type" :: "map<int,int>" :: Nil)
errorTest(
+ "sorting by attributes are not from grouping expressions",
+ testRelation2.groupBy('a, 'c)('a, 'c, count('a).as("a3")).orderBy('b.asc),
+ "cannot resolve" :: "'b'" :: "given input columns" :: "[a, c, a3]" :: Nil)
+
+ errorTest(
"non-boolean filters",
testRelation.where(Literal(1)),
"filter" :: "'1'" :: "not a boolean" :: Literal(1).dataType.simpleString :: Nil)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c2b0daa66b..75fa09db69 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,11 +24,10 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry}
-import org.apache.spark.sql.catalyst.parser.ParserConf
-import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
-import org.apache.spark.sql.hive.{HiveContext, HiveQl, MetastoreRelation}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
@@ -927,6 +926,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
).map(i => Row(i._1, i._2, i._3, i._4)))
}
+ test("Sorting columns are not in Generate") {
+ withTempTable("data") {
+ sqlContext.range(1, 5)
+ .select(array($"id", $"id" + 1).as("a"), $"id".as("b"), (lit(10) - $"id").as("c"))
+ .registerTempTable("data")
+
+ // case 1: missing sort columns are resolvable if join is true
+ checkAnswer(
+ sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c"),
+ Row(1, 1) :: Row(2, 1) :: Nil)
+
+ // case 2: missing sort columns are resolvable if join is false
+ checkAnswer(
+ sql("SELECT explode(a) AS val FROM data order by val, c"),
+ Seq(1, 2, 2, 3, 3, 4, 4, 5).map(i => Row(i)))
+
+ // case 3: missing sort columns are resolvable if join is true and outer is true
+ checkAnswer(
+ sql(
+ """
+ |SELECT C.val, b FROM data LATERAL VIEW OUTER explode(a) C as val
+ |where b < 2 order by c, val, b
+ """.stripMargin),
+ Row(1, 1) :: Row(2, 1) :: Nil)
+ }
+ }
+
test("window function: Sorting columns are not in Project") {
val data = Seq(
WindowData(1, "d", 10),