aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2015-07-31 12:07:18 -0700
committerYin Huai <yhuai@databricks.com>2015-07-31 12:08:25 -0700
commit39ab199a3f735b7658ab3331d3e2fb03441aec13 (patch)
treeff72a5998e11b325e7e447535f60fd9774547819 /sql/core
parent0a1d2ca42c8b31d6b0e70163795f0185d4622f87 (diff)
downloadspark-39ab199a3f735b7658ab3331d3e2fb03441aec13.tar.gz
spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.tar.bz2
spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.zip
[SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single Window Operator
This PR enables the processing of multiple window frames in a single window operator. This should improve the performance of processing multiple window expressions wich share partition by/order by clauses, because it will be more efficient with respect to memory use and group processing. Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #7515 from hvanhovell/SPARK-8640 and squashes the following commits: f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use partition by/order by specs directly instead of using WindowSpec. e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in a single Window operator.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala19
2 files changed, 13 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 03d24a88d4..4aff52d992 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -389,8 +389,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil
}
}
- case logical.Window(projectList, windowExpressions, spec, child) =>
- execution.Window(projectList, windowExpressions, spec, planLater(child)) :: Nil
+ case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
+ execution.Window(
+ projectList, windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 91c8a02e2b..fe9f2c7028 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -80,23 +80,24 @@ import scala.collection.mutable
case class Window(
projectList: Seq[Attribute],
windowExpression: Seq[NamedExpression],
- windowSpec: WindowSpecDefinition,
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute)
override def requiredChildDistribution: Seq[Distribution] = {
- if (windowSpec.partitionSpec.isEmpty) {
+ if (partitionSpec.isEmpty) {
// Only show warning when the number of bytes is larger than 100 MB?
logWarning("No Partition Defined for Window operation! Moving all data to a single "
+ "partition, this can cause serious performance degradation.")
AllTuples :: Nil
- } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil
+ } else ClusteredDistribution(partitionSpec) :: Nil
}
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
- Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec)
+ Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
@@ -115,12 +116,12 @@ case class Window(
case RangeFrame =>
val (exprs, current, bound) = if (offset == 0) {
// Use the entire order expression when the offset is 0.
- val exprs = windowSpec.orderSpec.map(_.child)
+ val exprs = orderSpec.map(_.child)
val projection = newMutableProjection(exprs, child.output)
- (windowSpec.orderSpec, projection(), projection())
- } else if (windowSpec.orderSpec.size == 1) {
+ (orderSpec, projection(), projection())
+ } else if (orderSpec.size == 1) {
// Use only the first order expression when the offset is non-null.
- val sortExpr = windowSpec.orderSpec.head
+ val sortExpr = orderSpec.head
val expr = sortExpr.child
// Create the projection which returns the current 'value'.
val current = newMutableProjection(expr :: Nil, child.output)()
@@ -250,7 +251,7 @@ case class Window(
// Get all relevant projections.
val result = createResultProjection(unboundExpressions)
- val grouping = newProjection(windowSpec.partitionSpec, child.output)
+ val grouping = newProjection(partitionSpec, child.output)
// Manage the stream and the grouping.
var nextRow: InternalRow = EmptyRow