aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2015-07-31 12:07:18 -0700
committerYin Huai <yhuai@databricks.com>2015-07-31 12:08:25 -0700
commit39ab199a3f735b7658ab3331d3e2fb03441aec13 (patch)
treeff72a5998e11b325e7e447535f60fd9774547819 /sql
parent0a1d2ca42c8b31d6b0e70163795f0185d4622f87 (diff)
downloadspark-39ab199a3f735b7658ab3331d3e2fb03441aec13.tar.gz
spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.tar.bz2
spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.zip
[SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single Window Operator
This PR enables the processing of multiple window frames in a single window operator. This should improve the performance of processing multiple window expressions wich share partition by/order by clauses, because it will be more efficient with respect to memory use and group processing. Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #7515 from hvanhovell/SPARK-8640 and squashes the following commits: f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use partition by/order by specs directly instead of using WindowSpec. e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in a single Window operator.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala19
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala18
5 files changed, 40 insertions, 17 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 265f3d1e41..51d910b258 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -347,7 +347,7 @@ class Analyzer(
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
(oldVersion, oldVersion.copy(generatorOutput = newOutput))
- case oldVersion @ Window(_, windowExpressions, _, child)
+ case oldVersion @ Window(_, windowExpressions, _, _, child)
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
@@ -825,7 +825,7 @@ class Analyzer(
}.asInstanceOf[NamedExpression]
}
- // Second, we group extractedWindowExprBuffer based on their Window Spec.
+ // Second, we group extractedWindowExprBuffer based on their Partition and Order Specs.
val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr =>
val distinctWindowSpec = expr.collect {
case window: WindowExpression => window.windowSpec
@@ -841,7 +841,8 @@ class Analyzer(
failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." +
s"Please file a bug report with this error message, stack trace, and the query.")
} else {
- distinctWindowSpec.head
+ val spec = distinctWindowSpec.head
+ (spec.partitionSpec, spec.orderSpec)
}
}.toSeq
@@ -850,9 +851,10 @@ class Analyzer(
var currentChild = child
var i = 0
while (i < groupedWindowExpressions.size) {
- val (windowSpec, windowExpressions) = groupedWindowExpressions(i)
+ val ((partitionSpec, orderSpec), windowExpressions) = groupedWindowExpressions(i)
// Set currentChild to the newly created Window operator.
- currentChild = Window(currentChild.output, windowExpressions, windowSpec, currentChild)
+ currentChild = Window(currentChild.output, windowExpressions,
+ partitionSpec, orderSpec, currentChild)
// Move to next Window Spec.
i += 1
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a67f8de6b7..aacfc86ab0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -228,7 +228,8 @@ case class Aggregate(
case class Window(
projectList: Seq[Attribute],
windowExpressions: Seq[NamedExpression],
- windowSpec: WindowSpecDefinition,
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 03d24a88d4..4aff52d992 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -389,8 +389,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil
}
}
- case logical.Window(projectList, windowExpressions, spec, child) =>
- execution.Window(projectList, windowExpressions, spec, planLater(child)) :: Nil
+ case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
+ execution.Window(
+ projectList, windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 91c8a02e2b..fe9f2c7028 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -80,23 +80,24 @@ import scala.collection.mutable
case class Window(
projectList: Seq[Attribute],
windowExpression: Seq[NamedExpression],
- windowSpec: WindowSpecDefinition,
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute)
override def requiredChildDistribution: Seq[Distribution] = {
- if (windowSpec.partitionSpec.isEmpty) {
+ if (partitionSpec.isEmpty) {
// Only show warning when the number of bytes is larger than 100 MB?
logWarning("No Partition Defined for Window operation! Moving all data to a single "
+ "partition, this can cause serious performance degradation.")
AllTuples :: Nil
- } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
+ } else ClusteredDistribution(partitionSpec) :: Nil
}
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
- Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec)
+ Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
@@ -115,12 +116,12 @@ case class Window(
case RangeFrame =>
val (exprs, current, bound) = if (offset == 0) {
// Use the entire order expression when the offset is 0.
- val exprs = windowSpec.orderSpec.map(_.child)
+ val exprs = orderSpec.map(_.child)
val projection = newMutableProjection(exprs, child.output)
- (windowSpec.orderSpec, projection(), projection())
- } else if (windowSpec.orderSpec.size == 1) {
+ (orderSpec, projection(), projection())
+ } else if (orderSpec.size == 1) {
// Use only the first order expression when the offset is non-null.
- val sortExpr = windowSpec.orderSpec.head
+ val sortExpr = orderSpec.head
val expr = sortExpr.child
// Create the projection which returns the current 'value'.
val current = newMutableProjection(expr :: Nil, child.output)()
@@ -250,7 +251,7 @@ case class Window(
// Get all relevant projections.
val result = createResultProjection(unboundExpressions)
- val grouping = newProjection(windowSpec.partitionSpec, child.output)
+ val grouping = newProjection(partitionSpec, child.output)
// Manage the stream and the grouping.
var nextRow: InternalRow = EmptyRow
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index bdb53ddf59..ba56a8a6b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -17,7 +17,10 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.TestHive
class HivePlanTest extends QueryTest {
@@ -31,4 +34,19 @@ class HivePlanTest extends QueryTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("window expressions sharing the same partition by and order by clause") {
+ val df = Seq.empty[(Int, String, Int, Int)].toDF("id", "grp", "seq", "val")
+ val window = Window.
+ partitionBy($"grp").
+ orderBy($"val")
+ val query = df.select(
+ $"id",
+ sum($"val").over(window.rowsBetween(-1, 1)),
+ sum($"val").over(window.rangeBetween(-1, 1))
+ )
+ val plan = query.queryExecution.analyzed
+ assert(plan.collect{ case w: logical.Window => w }.size === 1,
+ "Should have only 1 Window operator.")
+ }
}