aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-06-01 21:40:17 -0700
committerReynold Xin <rxin@databricks.com>2015-06-01 21:40:17 -0700
commite797dba58e8cafdd30683dd1e0263f00ce30ccc0 (patch)
tree5a8e92b38cdd756f35f11a28e2dc87b7272fdd01 /sql/hive
parent7f74bb3bc6d29c53e67af6b6eec336f2d083322a (diff)
downloadspark-e797dba58e8cafdd30683dd1e0263f00ce30ccc0.tar.gz
spark-e797dba58e8cafdd30683dd1e0263f00ce30ccc0.tar.bz2
spark-e797dba58e8cafdd30683dd1e0263f00ce30ccc0.zip
[SPARK-7965] [SPARK-7972] [SQL] Handle expressions containing multiple window expressions and make parser match window frames in case insensitive way
JIRAs: https://issues.apache.org/jira/browse/SPARK-7965 https://issues.apache.org/jira/browse/SPARK-7972 Author: Yin Huai <yhuai@databricks.com> Closes #6524 from yhuai/7965-7972 and squashes the following commits: c12c79c [Yin Huai] Add doc for returned value. de64328 [Yin Huai] Address rxin's comments. fc9b1ad [Yin Huai] wip 2996da4 [Yin Huai] scala style 20b65b7 [Yin Huai] Handle expressions containing multiple window expressions. 9568b21 [Yin Huai] case insensitive matches 41f633d [Yin Huai] Failed test case.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala36
2 files changed, 53 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 253bf11252..a5ca3613c5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1561,6 +1561,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
""".stripMargin)
}
+ /* Case insensitive matches for Window Specification */
+ val PRECEDING = "(?i)preceding".r
+ val FOLLOWING = "(?i)following".r
+ val CURRENT = "(?i)current".r
def nodesToWindowSpecification(nodes: Seq[ASTNode]): WindowSpec = nodes match {
case Token(windowName, Nil) :: Nil =>
// Refer to a window spec defined in the window clause.
@@ -1614,11 +1618,19 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
} else {
val frameType = rowFrame.map(_ => RowFrame).getOrElse(RangeFrame)
def nodeToBoundary(node: Node): FrameBoundary = node match {
- case Token("preceding", Token(count, Nil) :: Nil) =>
- if (count == "unbounded") UnboundedPreceding else ValuePreceding(count.toInt)
- case Token("following", Token(count, Nil) :: Nil) =>
- if (count == "unbounded") UnboundedFollowing else ValueFollowing(count.toInt)
- case Token("current", Nil) => CurrentRow
+ case Token(PRECEDING(), Token(count, Nil) :: Nil) =>
+ if (count.toLowerCase() == "unbounded") {
+ UnboundedPreceding
+ } else {
+ ValuePreceding(count.toInt)
+ }
+ case Token(FOLLOWING(), Token(count, Nil) :: Nil) =>
+ if (count.toLowerCase() == "unbounded") {
+ UnboundedFollowing
+ } else {
+ ValueFollowing(count.toInt)
+ }
+ case Token(CURRENT(), Nil) => CurrentRow
case _ =>
throw new NotImplementedError(
s"""No parse rules for the Window Frame Boundary based on Node ${node.getName}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 27863a6014..aba3becb1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -780,6 +780,42 @@ class SQLQuerySuite extends QueryTest {
).map(i => Row(i._1, i._2, i._3, i._4)))
}
+ test("window function: multiple window expressions in a single expression") {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
+ nums.registerTempTable("nums")
+
+ val expected =
+ Row(1, 1, 1, 55, 1, 57) ::
+ Row(0, 2, 3, 55, 2, 60) ::
+ Row(1, 3, 6, 55, 4, 65) ::
+ Row(0, 4, 10, 55, 6, 71) ::
+ Row(1, 5, 15, 55, 9, 79) ::
+ Row(0, 6, 21, 55, 12, 88) ::
+ Row(1, 7, 28, 55, 16, 99) ::
+ Row(0, 8, 36, 55, 20, 111) ::
+ Row(1, 9, 45, 55, 25, 125) ::
+ Row(0, 10, 55, 55, 30, 140) :: Nil
+
+ val actual = sql(
+ """
+ |SELECT
+ | y,
+ | x,
+ | sum(x) OVER w1 AS running_sum,
+ | sum(x) OVER w2 AS total_sum,
+ | sum(x) OVER w3 AS running_sum_per_y,
+ | ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
+ |FROM nums
+ |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
+ | w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
+ | w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ """.stripMargin)
+
+ checkAnswer(actual, expected)
+
+ dropTempTable("nums")
+ }
+
test("test case key when") {
(1 to 5).map(i => (i, i.toString)).toDF("k", "v").registerTempTable("t")
checkAnswer(