aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-12-17 12:01:57 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-17 12:01:57 -0800
commit5fdcbdc0c9f91be9380b09643a5db0f96c673ce8 (patch)
treea81ab1007b399a687115064e4913abaf2d44697a /sql/core
parentcf50631a66500561ba44347711cdb6e963d9478f (diff)
downloadspark-5fdcbdc0c9f91be9380b09643a5db0f96c673ce8.tar.gz
spark-5fdcbdc0c9f91be9380b09643a5db0f96c673ce8.tar.bz2
spark-5fdcbdc0c9f91be9380b09643a5db0f96c673ce8.zip
[SPARK-4625] [SQL] Add sort by for DSL & SimpleSqlParser
Add `sort by` support for both DSL & SqlParser. This PR is relevant with #3386, either one merged, will cause the other rebased. Author: Cheng Hao <hao.cheng@intel.com> Closes #3481 from chenghao-intel/sortby and squashes the following commits: 041004f [Cheng Hao] Add sort by for DSL & SimpleSqlParser
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala7
3 files changed, 38 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index a66af602a1..7baf8ffcef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -216,6 +216,19 @@ class SchemaRDD(
def orderBy(sortExprs: SortOrder*): SchemaRDD =
new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
+ /**
+ * Sorts the results by the given expressions within partition.
+ * {{{
+ * schemaRDD.sortBy('a)
+ * schemaRDD.sortBy('a, 'b)
+ * schemaRDD.sortBy('a.asc, 'b.desc)
+ * }}}
+ *
+ * @group Query
+ */
+ def sortBy(sortExprs: SortOrder*): SchemaRDD =
+ new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan))
+
@deprecated("use limit with integer argument", "1.1.0")
def limit(limitExpr: Expression): SchemaRDD =
new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 1a330a2bb6..e40d034ce4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -120,6 +120,24 @@ class DslQuerySuite extends QueryTest {
mapData.collect().sortBy(_.data(1)).reverse.toSeq)
}
+ test("sorting #2") {
+ checkAnswer(
+ testData2.sortBy('a.asc, 'b.asc),
+ Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
+
+ checkAnswer(
+ testData2.sortBy('a.asc, 'b.desc),
+ Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
+
+ checkAnswer(
+ testData2.sortBy('a.desc, 'b.desc),
+ Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
+
+ checkAnswer(
+ testData2.sortBy('a.desc, 'b.asc),
+ Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
+ }
+
test("limit") {
checkAnswer(
testData.limit(10),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index bcebce7603..ddf4776ecf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -42,6 +42,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
TimeZone.setDefault(origZone)
}
+ test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
+ checkAnswer(
+ sql("SELECT a FROM testData2 SORT BY a"),
+ Seq(1, 1, 2 ,2 ,3 ,3).map(Seq(_))
+ )
+ }
+
test("grouping on nested fields") {
jsonRDD(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
.registerTempTable("rows")