aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-12-30 12:11:44 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-30 12:11:44 -0800
commit53f0a00b6051fb6cb52a90f91ae01bcd77e332c5 (patch)
treed1db4db01969748df0d537c345d507ff1f1abdb8 /sql
parentdaac221302e0cf71a7b7bda31625134cf7b9dce1 (diff)
downloadspark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.tar.gz
spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.tar.bz2
spark-53f0a00b6051fb6cb52a90f91ae01bcd77e332c5.zip
[Spark-4512] [SQL] Unresolved Attribute Exception in Sort By
It will cause exception while do query like: SELECT key+key FROM src sort by value; Author: Cheng Hao <hao.cheng@intel.com> Closes #3386 from chenghao-intel/sort and squashes the following commits: 38c78cc [Cheng Hao] revert the SortPartition in SparkStrategies 7e9dd15 [Cheng Hao] update the typo fcd1d64 [Cheng Hao] rebase the latest master and update the SortBy unit test
Diffstat (limited to 'sql')
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala13
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala7
11 files changed, 55 insertions, 31 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 66860a4c09..f79d4ff444 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -204,8 +204,8 @@ class SqlParser extends AbstractSparkSQLParser {
)
protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] =
- ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) }
- | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) }
+ ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, true, l) }
+ | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, false, l) }
)
protected lazy val ordering: Parser[Seq[SortOrder]] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1c4088b843..72680f37a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -246,7 +246,7 @@ class Analyzer(catalog: Catalog,
case p: LogicalPlan if !p.childrenResolved => p
// If the projection list contains Stars, expand it.
- case p@Project(projectList, child) if containsStar(projectList) =>
+ case p @ Project(projectList, child) if containsStar(projectList) =>
Project(
projectList.flatMap {
case s: Star => s.expand(child.output, resolver)
@@ -310,7 +310,8 @@ class Analyzer(catalog: Catalog,
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved =>
+ case s @ Sort(ordering, global, p @ Project(projectList, child))
+ if !s.resolved && p.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
val resolved = unresolved.flatMap(child.resolve(_, resolver))
val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a })
@@ -319,13 +320,14 @@ class Analyzer(catalog: Catalog,
if (missingInProject.nonEmpty) {
// Add missing attributes and then project them away after the sort.
Project(projectList.map(_.toAttribute),
- Sort(ordering,
+ Sort(ordering, global,
Project(projectList ++ missingInProject, child)))
} else {
logDebug(s"Failed to find $missingInProject in ${p.output.mkString(", ")}")
s // Nothing we can do here. Return original plan.
}
- case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved =>
+ case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
+ if !s.resolved && a.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
// A small hack to create an object that will allow us to resolve any references that
// refer to named expressions that are present in the grouping expressions.
@@ -340,8 +342,7 @@ class Analyzer(catalog: Catalog,
if (missingInAggs.nonEmpty) {
// Add missing grouping exprs and then project them away after the sort.
Project(a.output,
- Sort(ordering,
- Aggregate(grouping, aggs ++ missingInAggs, child)))
+ Sort(ordering, global, Aggregate(grouping, aggs ++ missingInAggs, child)))
} else {
s // Nothing we can do here. Return original plan.
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index fb252cdf51..a14e5b9ef1 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -244,9 +244,9 @@ package object dsl {
condition: Option[Expression] = None) =
Join(logicalPlan, otherPlan, joinType, condition)
- def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
+ def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, true, logicalPlan)
- def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan)
+ def sortBy(sortExprs: SortOrder*) = Sort(sortExprs, false, logicalPlan)
def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
val aliasedExprs = aggregateExprs.map {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a9282b98ad..0b9f01cbae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -130,7 +130,16 @@ case class WriteToFile(
override def output = child.output
}
-case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
+/**
+ * @param order The ordering expressions
+ * @param global True means global sorting apply for entire data set,
+ * False means sorting only apply within the partition.
+ * @param child Child logical plan
+ */
+case class Sort(
+ order: Seq[SortOrder],
+ global: Boolean,
+ child: LogicalPlan) extends UnaryNode {
override def output = child.output
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 856b10f1a8..80787b61ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -214,7 +214,7 @@ class SchemaRDD(
* @group Query
*/
def orderBy(sortExprs: SortOrder*): SchemaRDD =
- new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
+ new SchemaRDD(sqlContext, Sort(sortExprs, true, logicalPlan))
/**
* Sorts the results by the given expressions within partition.
@@ -227,7 +227,7 @@ class SchemaRDD(
* @group Query
*/
def sortBy(sortExprs: SortOrder*): SchemaRDD =
- new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan))
+ new SchemaRDD(sqlContext, Sort(sortExprs, false, logicalPlan))
@deprecated("use limit with integer argument", "1.1.0")
def limit(limitExpr: Expression): SchemaRDD =
@@ -238,7 +238,6 @@ class SchemaRDD(
* {{{
* schemaRDD.limit(10)
* }}}
- *
* @group Query
*/
def limit(limitNum: Int): SchemaRDD =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2954d4ce7d..9151da69ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -190,7 +190,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object TakeOrdered extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
+ case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrdered(limit, order, planLater(child)) :: Nil
case _ => Nil
}
@@ -257,15 +257,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
- case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
- execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
- case logical.Sort(sortExprs, child) =>
- execution.Sort(sortExprs, global = true, planLater(child)):: Nil
-
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
+ case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
+ execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
+ case logical.Sort(sortExprs, global, child) =>
+ execution.Sort(sortExprs, global, planLater(child)):: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 691c4b3828..c0b9cf5163 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -88,7 +88,7 @@ class DslQuerySuite extends QueryTest {
Seq(Seq(6)))
}
- test("sorting") {
+ test("global sorting") {
checkAnswer(
testData2.orderBy('a.asc, 'b.asc),
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
@@ -122,22 +122,31 @@ class DslQuerySuite extends QueryTest {
mapData.collect().sortBy(_.data(1)).reverse.toSeq)
}
- test("sorting #2") {
+ test("partition wide sorting") {
+ // 2 partitions totally, and
+ // Partition #1 with values:
+ // (1, 1)
+ // (1, 2)
+ // (2, 1)
+ // Partition #2 with values:
+ // (2, 2)
+ // (3, 1)
+ // (3, 2)
checkAnswer(
testData2.sortBy('a.asc, 'b.asc),
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
checkAnswer(
testData2.sortBy('a.asc, 'b.desc),
- Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
+ Seq((1,2), (1,1), (2,1), (2,2), (3,2), (3,1)))
checkAnswer(
testData2.sortBy('a.desc, 'b.desc),
- Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
+ Seq((2,1), (1,2), (1,1), (3,2), (3,1), (2,2)))
checkAnswer(
testData2.sortBy('a.desc, 'b.asc),
- Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
+ Seq((2,1), (1,1), (1,2), (3,1), (3,2), (2,2)))
}
test("limit") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index bb553a0a1e..497897c3c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -55,7 +55,7 @@ object TestData {
TestData2(2, 1) ::
TestData2(2, 2) ::
TestData2(3, 1) ::
- TestData2(3, 2) :: Nil).toSchemaRDD
+ TestData2(3, 2) :: Nil, 2).toSchemaRDD
testData2.registerTempTable("testData2")
case class DecimalData(a: BigDecimal, b: BigDecimal)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 3f3d9e7cd4..8a9613cf96 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -680,16 +680,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val withSort =
(orderByClause, sortByClause, distributeByClause, clusterByClause) match {
case (Some(totalOrdering), None, None, None) =>
- Sort(totalOrdering.getChildren.map(nodeToSortOrder), withHaving)
+ Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, withHaving)
case (None, Some(perPartitionOrdering), None, None) =>
- SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder), withHaving)
+ Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
case (None, None, Some(partitionExprs), None) =>
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
- SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder),
+ Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, Some(clusterExprs)) =>
- SortPartitions(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
+ Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, None) => withHaving
case _ => sys.error("Unsupported set of ordering / distribution clauses.")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 8011f9b877..4104df8f8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -132,7 +132,7 @@ abstract class HiveComparisonTest
def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
- case PhysicalOperation(_, _, Sort(_, _)) => true
+ case PhysicalOperation(_, _, Sort(_, true, _)) => true
case _ => plan.children.iterator.exists(isSorted)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f57f31af15..5d0fb72370 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -32,6 +32,13 @@ case class Nested3(f3: Int)
* valid, but Hive currently cannot execute it.
*/
class SQLQuerySuite extends QueryTest {
+ test("SPARK-4512 Fix attribute reference resolution error when using SORT BY") {
+ checkAnswer(
+ sql("SELECT * FROM (SELECT key + key AS a FROM src SORT BY value) t ORDER BY t.a"),
+ sql("SELECT key + key as a FROM src ORDER BY a").collect().toSeq
+ )
+ }
+
test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect
sql(