From 5fdcbdc0c9f91be9380b09643a5db0f96c673ce8 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Wed, 17 Dec 2014 12:01:57 -0800 Subject: [SPARK-4625] [SQL] Add sort by for DSL & SimpleSqlParser Add `sort by` support for both DSL & SqlParser. This PR is relevant with #3386, either one merged, will cause the other rebased. Author: Cheng Hao Closes #3481 from chenghao-intel/sortby and squashes the following commits: 041004f [Cheng Hao] Add sort by for DSL & SimpleSqlParser --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 10 ++++++++-- .../org/apache/spark/sql/catalyst/dsl/package.scala | 2 ++ .../main/scala/org/apache/spark/sql/SchemaRDD.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/DslQuerySuite.scala | 18 ++++++++++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++ 5 files changed, 48 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index a2bcd73b60..d4fc9bbfd3 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -85,6 +85,7 @@ class SqlParser extends AbstractSparkSQLParser { protected val ON = Keyword("ON") protected val OR = Keyword("OR") protected val ORDER = Keyword("ORDER") + protected val SORT = Keyword("SORT") protected val OUTER = Keyword("OUTER") protected val OVERWRITE = Keyword("OVERWRITE") protected val REGEXP = Keyword("REGEXP") @@ -140,7 +141,7 @@ class SqlParser extends AbstractSparkSQLParser { (WHERE ~> expression).? ~ (GROUP ~ BY ~> rep1sep(expression, ",")).? ~ (HAVING ~> expression).? ~ - (ORDER ~ BY ~> ordering).? ~ + sortType.? ~ (LIMIT ~> expression).? ^^ { case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l => val base = r.getOrElse(NoRelation) @@ -150,7 +151,7 @@ class SqlParser extends AbstractSparkSQLParser { .getOrElse(Project(assignAliases(p), withFilter)) val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection) val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct) - val withOrder = o.map(Sort(_, withHaving)).getOrElse(withHaving) + val withOrder = o.map(_(withHaving)).getOrElse(withHaving) val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder) withLimit } @@ -202,6 +203,11 @@ class SqlParser extends AbstractSparkSQLParser { | FULL ~ OUTER.? ^^^ FullOuter ) + protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] = + ( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) } + | SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) } + ) + protected lazy val ordering: Parser[Seq[SortOrder]] = ( rep1sep(singleOrder, ",") | rep1sep(expression, ",") ~ direction.? ^^ { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 70dabc4e6c..fb252cdf51 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -246,6 +246,8 @@ package object dsl { def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan) + def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan) + def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = { val aliasedExprs = aggregateExprs.map { case ne: NamedExpression => ne diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index a66af602a1..7baf8ffcef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -216,6 +216,19 @@ class SchemaRDD( def orderBy(sortExprs: SortOrder*): SchemaRDD = new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan)) + /** + * Sorts the results by the given expressions within partition. + * {{{ + * schemaRDD.sortBy('a) + * schemaRDD.sortBy('a, 'b) + * schemaRDD.sortBy('a.asc, 'b.desc) + * }}} + * + * @group Query + */ + def sortBy(sortExprs: SortOrder*): SchemaRDD = + new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan)) + @deprecated("use limit with integer argument", "1.1.0") def limit(limitExpr: Expression): SchemaRDD = new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 1a330a2bb6..e40d034ce4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -120,6 +120,24 @@ class DslQuerySuite extends QueryTest { mapData.collect().sortBy(_.data(1)).reverse.toSeq) } + test("sorting #2") { + checkAnswer( + testData2.sortBy('a.asc, 'b.asc), + Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2))) + + checkAnswer( + testData2.sortBy('a.asc, 'b.desc), + Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1))) + + checkAnswer( + testData2.sortBy('a.desc, 'b.desc), + Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1))) + + checkAnswer( + testData2.sortBy('a.desc, 'b.asc), + Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2))) + } + test("limit") { checkAnswer( testData.limit(10), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bcebce7603..ddf4776ecf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -42,6 +42,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { TimeZone.setDefault(origZone) } + test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") { + checkAnswer( + sql("SELECT a FROM testData2 SORT BY a"), + Seq(1, 1, 2 ,2 ,3 ,3).map(Seq(_)) + ) + } + test("grouping on nested fields") { jsonRDD(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil)) .registerTempTable("rows") -- cgit v1.2.3