aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-03-21 18:08:41 +0800
committerWenchen Fan <wenchen@databricks.com>2016-03-21 18:08:41 +0800
commit2c5b18fb0fdeabd378dd97e91f72d1eac4e21cc7 (patch)
treea0af9270c27ac3ec3762ba6e2d0c29a21d8969da /sql
parentc35c60fa916e92916442a98f4af123704bb9692e (diff)
downloadspark-2c5b18fb0fdeabd378dd97e91f72d1eac4e21cc7.tar.gz
spark-2c5b18fb0fdeabd378dd97e91f72d1eac4e21cc7.tar.bz2
spark-2c5b18fb0fdeabd378dd97e91f72d1eac4e21cc7.zip
[SPARK-12789][SQL] Support Order By Ordinal in SQL
#### What changes were proposed in this pull request? This PR is to support order by position in SQL, e.g. ```SQL select c1, c2, c3 from tbl order by 1 desc, 3 ``` should be equivalent to ```SQL select c1, c2, c3 from tbl order by c1 desc, c3 asc ``` This is controlled by config option `spark.sql.orderByOrdinal`. - When true, the ordinal numbers are treated as the position in the select list. - When false, the ordinal number in order/sort By clause are ignored. - Only convert integer literals (not foldable expressions). If found foldable expressions, ignore them - This also works with select *. **Question**: Do we still need sort by columns that contain zero reference? In this case, it will have no impact on the sorting results. IMO, we should not allow users do it. rxin cloud-fan marmbrus yhuai hvanhovell -- Update: In these cases, they are ignored in this case. **Note**: This PR is taken from https://github.com/apache/spark/pull/10731. When merging this PR, please give the credit to zhichao-li Also cc all the people who are involved in the previous discussion: adrian-wang chenghao-intel tejasapatil #### How was this patch tested? Added a few test cases for both positive and negative test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #11815 from gatorsmile/orderByPosition.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala13
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala43
9 files changed, 117 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 35884139b6..e10ab9790d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._
private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean
+ def orderByOrdinal: Boolean
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two
* identifiers are equal.
@@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
+ override def orderByOrdinal: Boolean = {
+ throw new UnsupportedOperationException
+ }
}
/** A CatalystConf that can be used for local testing. */
-case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
+case class SimpleCatalystConf(
+ caseSensitiveAnalysis: Boolean,
+ orderByOrdinal: Boolean = true)
+ extends CatalystConf {
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e4e934a015..333a54ee76 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -40,7 +41,10 @@ import org.apache.spark.sql.types._
* references.
*/
object SimpleAnalyzer
- extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))
+ extends Analyzer(
+ EmptyCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = true))
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
@@ -618,13 +622,36 @@ class Analyzer(
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
+ *
+ * This rule also resolves the position number in sort references. This support is introduced
+ * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
+ * - When the sort references are not integer but foldable expressions, ignore them.
+ * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case s: Sort if !s.child.resolved => s
+ // Replace the index with the related attribute for ORDER BY
+ // which is a 1-base position of the projection list.
+ case s @ Sort(orders, global, child)
+ if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
+ val newOrders = orders map {
+ case s @ SortOrder(IntegerIndex(index), direction) =>
+ if (index > 0 && index <= child.output.size) {
+ SortOrder(child.output(index - 1), direction)
+ } else {
+ throw new UnresolvedException(s,
+ s"Order/sort By position: $index does not exist " +
+ s"The Select List is indexed from 1 to ${child.output.size}")
+ }
+ case o => o
+ }
+ Sort(newOrders, global, child)
+
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa
- case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
+ case s @ Sort(order, _, child) if !s.resolved =>
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 62d54df98e..ef74504c66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
/**
* A pattern that matches any number of project or filter operations on top of another relational
@@ -202,3 +203,15 @@ object Unions {
}
}
}
+
+/**
+ * Extractor for retrieving Int value.
+ */
+object IntegerIndex {
+ def unapply(a: Any): Option[Int] = a match {
+ case Literal(a: Int, IntegerType) => Some(a)
+ // When resolving ordinal in Sort, negative values are extracted for issuing error messages.
+ case UnaryMinus(IntegerLiteral(v)) => Some(-v)
+ case _ => None
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index ef825e6062..39166c4f8e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
trait AnalysisTest extends PlanTest {
val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
- val caseSensitiveConf = new SimpleCatalystConf(true)
- val caseInsensitiveConf = new SimpleCatalystConf(false)
+ val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+ val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b2613e4909..9aa685e1e8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
- val conf = new SimpleCatalystConf(true)
+ val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index da43751b0a..47b79fe462 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
}
private val caseInsensitiveAnalyzer =
- new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false))
+ new Analyzer(
+ EmptyCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = false))
test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
val plan = caseInsensitiveAnalyzer.execute(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 27c15e856a..a4c8d1c6d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -25,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
class EliminateSortsSuite extends PlanTest {
+ val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
+ val catalog = new SimpleCatalog(conf)
+ val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
@@ -48,8 +53,8 @@ class EliminateSortsSuite extends PlanTest {
val x = testRelation
val query = x.orderBy(SortOrder(3, Ascending), SortOrder(-1, Ascending))
- val optimized = Optimize.execute(query.analyze)
- val correctAnswer = x.analyze
+ val optimized = Optimize.execute(analyzer.execute(query))
+ val correctAnswer = analyzer.execute(x)
comparePlans(optimized, correctAnswer)
}
@@ -58,8 +63,8 @@ class EliminateSortsSuite extends PlanTest {
val x = testRelation
val query = x.orderBy(SortOrder(3, Ascending), 'a.asc)
- val optimized = Optimize.execute(query.analyze)
- val correctAnswer = x.orderBy('a.asc).analyze
+ val optimized = Optimize.execute(analyzer.execute(query))
+ val correctAnswer = analyzer.execute(x.orderBy('a.asc))
comparePlans(optimized, correctAnswer)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9e0878a514..3d1d5b120a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -435,6 +435,11 @@ object SQLConf {
defaultValue = Some(true),
doc = "When false, we will treat bucketed table as normal table.")
+ val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
+ defaultValue = Some(true),
+ doc = "When true, the ordinal numbers are treated as the position in the select list. " +
+ "When false, the ordinal numbers in order/sort By clause are ignored.")
+
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
@@ -634,6 +639,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS)
+ override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6716982118..b765fd8b66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,6 +21,8 @@ import java.math.MathContext
import java.sql.Timestamp
import org.apache.spark.AccumulatorSuite
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
import org.apache.spark.sql.functions._
@@ -2156,6 +2158,47 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("order by ordinal number") {
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
+ sql("SELECT * FROM testData2 ORDER BY a DESC"))
+ // If the position is not an integer, ignore it.
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"),
+ sql("SELECT * FROM testData2 ORDER BY b ASC"))
+
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
+ sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
+ checkAnswer(
+ sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"),
+ sql("SELECT * FROM testData2 SORT BY a DESC, b ASC"))
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"),
+ Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2)))
+ }
+
+ test("order by ordinal number - negative cases") {
+ intercept[UnresolvedException[SortOrder]] {
+ sql("SELECT * FROM testData2 ORDER BY 0")
+ }
+ intercept[UnresolvedException[SortOrder]] {
+ sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC")
+ }
+ intercept[UnresolvedException[SortOrder]] {
+ sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC")
+ }
+ }
+
+ test("order by ordinal number with conf spark.sql.orderByOrdinal=false") {
+ withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") {
+ // If spark.sql.orderByOrdinal=false, ignore the position number.
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
+ sql("SELECT * FROM testData2 ORDER BY b ASC"))
+ }
+ }
+
test("natural join") {
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")