aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2015-07-19 16:29:50 -0700
committerYin Huai <yhuai@databricks.com>2015-07-19 16:29:50 -0700
commit7a81245345f2d6124423161786bb0d9f1c278ab8 (patch)
tree5311b3a65d5f4d419af0ab2b2c9448fab0465b81
parenta803ac3e060d181c7b34d9501c9350e5f215ba85 (diff)
downloadspark-7a81245345f2d6124423161786bb0d9f1c278ab8.tar.gz
spark-7a81245345f2d6124423161786bb0d9f1c278ab8.tar.bz2
spark-7a81245345f2d6124423161786bb0d9f1c278ab8.zip
[SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup
This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved. Git commit message is wrong BTW :(... Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #7513 from hvanhovell/SPARK-8638-cleanup and squashes the following commits: 4e69d08 [Herman van Hovell] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase)
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala43
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala79
3 files changed, 51 insertions, 85 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index a054f52b8b..de04132eb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -118,22 +118,24 @@ case class Window(
val exprs = windowSpec.orderSpec.map(_.child)
val projection = newMutableProjection(exprs, child.output)
(windowSpec.orderSpec, projection(), projection())
- }
- else if (windowSpec.orderSpec.size == 1) {
+ } else if (windowSpec.orderSpec.size == 1) {
// Use only the first order expression when the offset is non-null.
val sortExpr = windowSpec.orderSpec.head
val expr = sortExpr.child
// Create the projection which returns the current 'value'.
val current = newMutableProjection(expr :: Nil, child.output)()
// Flip the sign of the offset when processing the order is descending
- val boundOffset = if (sortExpr.direction == Descending) -offset
- else offset
+ val boundOffset =
+ if (sortExpr.direction == Descending) {
+ -offset
+ } else {
+ offset
+ }
// Create the projection which returns the current 'value' modified by adding the offset.
val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
val bound = newMutableProjection(boundExpr :: Nil, child.output)()
(sortExpr :: Nil, current, bound)
- }
- else {
+ } else {
sys.error("Non-Zero range offsets are not supported for windows " +
"with multiple order expressions.")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
index 15b5f418f0..c177cbdd99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
@@ -212,4 +212,47 @@ class HiveDataFrameWindowSuite extends QueryTest {
| (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and current row)
| FROM window_table""".stripMargin).collect())
}
+
+ test("reverse sliding range frame") {
+ val df = Seq(
+ (1, "Thin", "Cell Phone", 6000),
+ (2, "Normal", "Tablet", 1500),
+ (3, "Mini", "Tablet", 5500),
+ (4, "Ultra thin", "Cell Phone", 5500),
+ (5, "Very thin", "Cell Phone", 6000),
+ (6, "Big", "Tablet", 2500),
+ (7, "Bendable", "Cell Phone", 3000),
+ (8, "Foldable", "Cell Phone", 3000),
+ (9, "Pro", "Tablet", 4500),
+ (10, "Pro2", "Tablet", 6500)).
+ toDF("id", "product", "category", "revenue")
+ val window = Window.
+ partitionBy($"category").
+ orderBy($"revenue".desc).
+ rangeBetween(-2000L, 1000L)
+ checkAnswer(
+ df.select(
+ $"id",
+ avg($"revenue").over(window).cast("int")),
+ Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+ Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
+ Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
+ Row(10, 6000) :: Nil)
+ }
+
+ // This is here to illustrate the fact that reverse order also reverses offsets.
+ test("reverse unbounded range frame") {
+ val df = Seq(1, 2, 4, 3, 2, 1).
+ map(Tuple1.apply).
+ toDF("value")
+ val window = Window.orderBy($"value".desc)
+ checkAnswer(
+ df.select(
+ $"value",
+ sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
+ sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
+ Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
+ Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
+
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
deleted file mode 100644
index a089d0d165..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{Row, QueryTest}
-import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-
-/**
- * Window expressions are tested extensively by the following test suites:
- * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
- * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
- * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
- * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill
- * this gap.
- *
- * TODO Move this class to the sql/core project when we move to Native Spark UDAFs.
- */
-class WindowSuite extends QueryTest {
-
- test("reverse sliding range frame") {
- val df = Seq(
- (1, "Thin", "Cell Phone", 6000),
- (2, "Normal", "Tablet", 1500),
- (3, "Mini", "Tablet", 5500),
- (4, "Ultra thin", "Cell Phone", 5500),
- (5, "Very thin", "Cell Phone", 6000),
- (6, "Big", "Tablet", 2500),
- (7, "Bendable", "Cell Phone", 3000),
- (8, "Foldable", "Cell Phone", 3000),
- (9, "Pro", "Tablet", 4500),
- (10, "Pro2", "Tablet", 6500)).
- toDF("id", "product", "category", "revenue")
- val window = Window.
- partitionBy($"category").
- orderBy($"revenue".desc).
- rangeBetween(-2000L, 1000L)
- checkAnswer(
- df.select(
- $"id",
- avg($"revenue").over(window).cast("int")),
- Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
- Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
- Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
- Row(10, 6000) :: Nil)
- }
-
- // This is here to illustrate the fact that reverse order also reverses offsets.
- test("reverse unbounded range frame") {
- val df = Seq(1, 2, 4, 3, 2, 1).
- map(Tuple1.apply).
- toDF("value")
- val window = Window.orderBy($"value".desc)
- checkAnswer(
- df.select(
- $"value",
- sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
- sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
- Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
- Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
-
- }
-}