aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala13
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala43
9 files changed, 117 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 35884139b6..e10ab9790d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._
private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean
+ def orderByOrdinal: Boolean
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two
* identifiers are equal.
@@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
+ override def orderByOrdinal: Boolean = {
+ throw new UnsupportedOperationException
+ }
}
/** A CatalystConf that can be used for local testing. */
-case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
+case class SimpleCatalystConf(
+ caseSensitiveAnalysis: Boolean,
+ orderByOrdinal: Boolean = true)
+ extends CatalystConf {
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e4e934a015..333a54ee76 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -40,7 +41,10 @@ import org.apache.spark.sql.types._
* references.
*/
object SimpleAnalyzer
- extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))
+ extends Analyzer(
+ EmptyCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = true))
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
@@ -618,13 +622,36 @@ class Analyzer(
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
+ *
+ * This rule also resolves the position number in sort references. This support is introduced
+ * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
+ * - When the sort references are not integer but foldable expressions, ignore them.
+ * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case s: Sort if !s.child.resolved => s
+ // Replace the index with the related attribute for ORDER BY
+ // which is a 1-base position of the projection list.
+ case s @ Sort(orders, global, child)
+ if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
+ val newOrders = orders map {
+ case s @ SortOrder(IntegerIndex(index), direction) =>
+ if (index > 0 && index <= child.output.size) {
+ SortOrder(child.output(index - 1), direction)
+ } else {
+ throw new UnresolvedException(s,
+ s"Order/sort By position: $index does not exist " +
+ s"The Select List is indexed from 1 to ${child.output.size}")
+ }
+ case o => o
+ }
+ Sort(newOrders, global, child)
+
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa
- case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
+ case s @ Sort(order, _, child) if !s.resolved =>
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 62d54df98e..ef74504c66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.IntegerType
/**
* A pattern that matches any number of project or filter operations on top of another relational
@@ -202,3 +203,15 @@ object Unions {
}
}
}
+
+/**
+ * Extractor for retrieving Int value.
+ */
+object IntegerIndex {
+ def unapply(a: Any): Option[Int] = a match {
+ case Literal(a: Int, IntegerType) => Some(a)
+ // When resolving ordinal in Sort, negative values are extracted for issuing error messages.
+ case UnaryMinus(IntegerLiteral(v)) => Some(-v)
+ case _ => None
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index ef825e6062..39166c4f8e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
trait AnalysisTest extends PlanTest {
val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
- val caseSensitiveConf = new SimpleCatalystConf(true)
- val caseInsensitiveConf = new SimpleCatalystConf(false)
+ val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+ val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b2613e4909..9aa685e1e8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
- val conf = new SimpleCatalystConf(true)
+ val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index da43751b0a..47b79fe462 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
}
private val caseInsensitiveAnalyzer =
- new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false))
+ new Analyzer(
+ EmptyCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = false))
test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
val plan = caseInsensitiveAnalyzer.execute(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 27c15e856a..a4c8d1c6d2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
@@ -25,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
class EliminateSortsSuite extends PlanTest {
+ val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
+ val catalog = new SimpleCatalog(conf)
+ val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
@@ -48,8 +53,8 @@ class EliminateSortsSuite extends PlanTest {
val x = testRelation
val query = x.orderBy(SortOrder(3, Ascending), SortOrder(-1, Ascending))
- val optimized = Optimize.execute(query.analyze)
- val correctAnswer = x.analyze
+ val optimized = Optimize.execute(analyzer.execute(query))
+ val correctAnswer = analyzer.execute(x)
comparePlans(optimized, correctAnswer)
}
@@ -58,8 +63,8 @@ class EliminateSortsSuite extends PlanTest {
val x = testRelation
val query = x.orderBy(SortOrder(3, Ascending), 'a.asc)
- val optimized = Optimize.execute(query.analyze)
- val correctAnswer = x.orderBy('a.asc).analyze
+ val optimized = Optimize.execute(analyzer.execute(query))
+ val correctAnswer = analyzer.execute(x.orderBy('a.asc))
comparePlans(optimized, correctAnswer)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9e0878a514..3d1d5b120a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -435,6 +435,11 @@ object SQLConf {
defaultValue = Some(true),
doc = "When false, we will treat bucketed table as normal table.")
+ val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
+ defaultValue = Some(true),
+ doc = "When true, the ordinal numbers are treated as the position in the select list. " +
+ "When false, the ordinal numbers in order/sort By clause are ignored.")
+
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
@@ -634,6 +639,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS)
+ override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6716982118..b765fd8b66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,6 +21,8 @@ import java.math.MathContext
import java.sql.Timestamp
import org.apache.spark.AccumulatorSuite
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
import org.apache.spark.sql.functions._
@@ -2156,6 +2158,47 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("order by ordinal number") {
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
+ sql("SELECT * FROM testData2 ORDER BY a DESC"))
+ // If the position is not an integer, ignore it.
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"),
+ sql("SELECT * FROM testData2 ORDER BY b ASC"))
+
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
+ sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
+ checkAnswer(
+ sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"),
+ sql("SELECT * FROM testData2 SORT BY a DESC, b ASC"))
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"),
+ Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2)))
+ }
+
+ test("order by ordinal number - negative cases") {
+ intercept[UnresolvedException[SortOrder]] {
+ sql("SELECT * FROM testData2 ORDER BY 0")
+ }
+ intercept[UnresolvedException[SortOrder]] {
+ sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC")
+ }
+ intercept[UnresolvedException[SortOrder]] {
+ sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC")
+ }
+ }
+
+ test("order by ordinal number with conf spark.sql.orderByOrdinal=false") {
+ withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") {
+ // If spark.sql.orderByOrdinal=false, ignore the position number.
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
+ sql("SELECT * FROM testData2 ORDER BY b ASC"))
+ }
+ }
+
test("natural join") {
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")