aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2015-07-18 23:44:38 -0700
committerYin Huai <yhuai@databricks.com>2015-07-18 23:44:38 -0700
commita9a0d0cebf8ab3c539723488e5945794ebfd6104 (patch)
tree1adb8d37a3259a3677cd3f9b686c9f404a1fcbab /sql/hive
parent04c1b49f5eee915ad1159a32bf12836a3b9f2620 (diff)
downloadspark-a9a0d0cebf8ab3c539723488e5945794ebfd6104.tar.gz
spark-a9a0d0cebf8ab3c539723488e5945794ebfd6104.tar.bz2
spark-a9a0d0cebf8ab3c539723488e5945794ebfd6104.zip
[SPARK-8638] [SQL] Window Function Performance Improvements
## Description Performance improvements for Spark Window functions. This PR will also serve as the basis for moving away from Hive UDAFs to Spark UDAFs. See JIRA tickets SPARK-8638 and SPARK-7712 for more information. ## Improvements * Much better performance (10x) in running cases (e.g. BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) and UNBOUDED FOLLOWING cases. The current implementation in spark uses a sliding window approach in these cases. This means that an aggregate is maintained for every row, so space usage is N (N being the number of rows). This also means that all these aggregates all need to be updated separately, this takes N*(N-1)/2 updates. The running case differs from the Sliding case because we are only adding data to an aggregate function (no reset is required), we only need to maintain one aggregate (like in the UNBOUNDED PRECEDING AND UNBOUNDED case), update the aggregate for each row, and get the aggregate value after each update. This is what the new implementation does. This approach only uses 1 buffer, and only requires N updates; I am currently working on data with window sizes of 500-1000 doing running sums and this saves a lot of time. The CURRENT ROW AND UNBOUNDED FOLLOWING case also uses this approach and the fact that aggregate operations are communitative, there is one twist though it will process the input buffer in reverse. * Fewer comparisons in the sliding case. The current implementation determines frame boundaries for every input row. The new implementation makes more use of the fact that the window is sorted, maintains the boundaries, and only moves them when the current row order changes. This is a minor improvement. * A single Window node is able to process all types of Frames for the same Partitioning/Ordering. This saves a little time/memory spent buffering and managing partitions. This will be enabled in a follow-up PR. * A lot of the staging code is moved from the execution phase to the initialization phase. Minor performance improvement, and improves readability of the execution code. ## Benchmarking I have done a small benchmark using [on time performance](http://www.transtats.bts.gov) data of the month april. I have used the origin as a partioning key, as a result there is quite some variation in window sizes. The code for the benchmark can be found in the JIRA ticket. These are the results per Frame type: Frame | Master | SPARK-8638 ----- | ------ | ---------- Entire Frame | 2 s | 1 s Sliding | 18 s | 1 s Growing | 14 s | 0.9 s Shrinking | 13 s | 1 s Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #7057 from hvanhovell/SPARK-8638 and squashes the following commits: 3bfdc49 [Herman van Hovell] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase) 2eb3b33 [Herman van Hovell] Corrected reverse range frame processing. 2cd2d5b [Herman van Hovell] Corrected reverse range frame processing. b0654d7 [Herman van Hovell] Tests for exotic frame specifications. e75b76e [Herman van Hovell] More docs, added support for reverse sliding range frames, and some reorganization of code. 1fdb558 [Herman van Hovell] Changed Data In HiveDataFrameWindowSuite. ac2f682 [Herman van Hovell] Added a few more comments. 1938312 [Herman van Hovell] Added Documentation to the createBoundOrdering methods. bb020e6 [Herman van Hovell] Major overhaul of Window operator.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala79
2 files changed, 82 insertions, 3 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
index efb3f2545d..15b5f418f0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
@@ -183,13 +183,13 @@ class HiveDataFrameWindowSuite extends QueryTest {
}
test("aggregation and range betweens with unbounded") {
- val df = Seq((1, "1"), (2, "2"), (2, "2"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+ val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value")
df.registerTempTable("window_table")
checkAnswer(
df.select(
$"key",
last("value").over(
- Window.partitionBy($"value").orderBy($"key").rangeBetween(1, Long.MaxValue))
+ Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1))
.equalTo("2")
.as("last_v"),
avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1))
@@ -203,7 +203,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
"""SELECT
| key,
| last_value(value) OVER
- | (PARTITION BY value ORDER BY key RANGE 1 preceding) == "2",
+ | (PARTITION BY value ORDER BY key RANGE BETWEEN 2 preceding and 1 preceding) == "2",
| avg(key) OVER
| (PARTITION BY value ORDER BY key RANGE BETWEEN unbounded preceding and 1 following),
| avg(key) OVER
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
new file mode 100644
index 0000000000..a089d0d165
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+/**
+ * Window expressions are tested extensively by the following test suites:
+ * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]]
+ * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]]
+ * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]]
+ * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill
+ * this gap.
+ *
+ * TODO Move this class to the sql/core project when we move to Native Spark UDAFs.
+ */
+class WindowSuite extends QueryTest {
+
+ test("reverse sliding range frame") {
+ val df = Seq(
+ (1, "Thin", "Cell Phone", 6000),
+ (2, "Normal", "Tablet", 1500),
+ (3, "Mini", "Tablet", 5500),
+ (4, "Ultra thin", "Cell Phone", 5500),
+ (5, "Very thin", "Cell Phone", 6000),
+ (6, "Big", "Tablet", 2500),
+ (7, "Bendable", "Cell Phone", 3000),
+ (8, "Foldable", "Cell Phone", 3000),
+ (9, "Pro", "Tablet", 4500),
+ (10, "Pro2", "Tablet", 6500)).
+ toDF("id", "product", "category", "revenue")
+ val window = Window.
+ partitionBy($"category").
+ orderBy($"revenue".desc).
+ rangeBetween(-2000L, 1000L)
+ checkAnswer(
+ df.select(
+ $"id",
+ avg($"revenue").over(window).cast("int")),
+ Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+ Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
+ Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
+ Row(10, 6000) :: Nil)
+ }
+
+ // This is here to illustrate the fact that reverse order also reverses offsets.
+ test("reverse unbounded range frame") {
+ val df = Seq(1, 2, 4, 3, 2, 1).
+ map(Tuple1.apply).
+ toDF("value")
+ val window = Window.orderBy($"value".desc)
+ checkAnswer(
+ df.select(
+ $"value",
+ sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
+ sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
+ Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
+ Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
+
+ }
+}