diff options
author | Herman van Hovell <hvanhovell@questtec.nl> | 2015-07-31 12:07:18 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-07-31 12:08:25 -0700 |
commit | 39ab199a3f735b7658ab3331d3e2fb03441aec13 (patch) | |
tree | ff72a5998e11b325e7e447535f60fd9774547819 /sql/core | |
parent | 0a1d2ca42c8b31d6b0e70163795f0185d4622f87 (diff) | |
download | spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.tar.gz spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.tar.bz2 spark-39ab199a3f735b7658ab3331d3e2fb03441aec13.zip |
[SPARK-8640] [SQL] Enable Processing of Multiple Window Frames in a Single Window Operator
This PR enables the processing of multiple window frames in a single window operator. This should improve the performance of processing multiple window expressions wich share partition by/order by clauses, because it will be more efficient with respect to memory use and group processing.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes #7515 from hvanhovell/SPARK-8640 and squashes the following commits:
f0e1c21 [Herman van Hovell] Changed Window Logical/Physical plans to use partition by/order by specs directly instead of using WindowSpec.
e1711c2 [Herman van Hovell] Enabled the processing of multiple window frames in a single Window operator.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 5 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala | 19 |
2 files changed, 13 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 03d24a88d4..4aff52d992 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -389,8 +389,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil } } - case logical.Window(projectList, windowExpressions, spec, child) => - execution.Window(projectList, windowExpressions, spec, planLater(child)) :: Nil + case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => + execution.Window( + projectList, windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil case logical.Sample(lb, ub, withReplacement, seed, child) => execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 91c8a02e2b..fe9f2c7028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -80,23 +80,24 @@ import scala.collection.mutable case class Window( projectList: Seq[Attribute], windowExpression: Seq[NamedExpression], - windowSpec: WindowSpecDefinition, + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) override def requiredChildDistribution: Seq[Distribution] = { - if (windowSpec.partitionSpec.isEmpty) { + if (partitionSpec.isEmpty) { // Only show warning when the number of bytes is larger than 100 MB? logWarning("No Partition Defined for Window operation! Moving all data to a single " + "partition, this can cause serious performance degradation.") AllTuples :: Nil - } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil + } else ClusteredDistribution(partitionSpec) :: Nil } override def requiredChildOrdering: Seq[Seq[SortOrder]] = - Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -115,12 +116,12 @@ case class Window( case RangeFrame => val (exprs, current, bound) = if (offset == 0) { // Use the entire order expression when the offset is 0. - val exprs = windowSpec.orderSpec.map(_.child) + val exprs = orderSpec.map(_.child) val projection = newMutableProjection(exprs, child.output) - (windowSpec.orderSpec, projection(), projection()) - } else if (windowSpec.orderSpec.size == 1) { + (orderSpec, projection(), projection()) + } else if (orderSpec.size == 1) { // Use only the first order expression when the offset is non-null. - val sortExpr = windowSpec.orderSpec.head + val sortExpr = orderSpec.head val expr = sortExpr.child // Create the projection which returns the current 'value'. val current = newMutableProjection(expr :: Nil, child.output)() @@ -250,7 +251,7 @@ case class Window( // Get all relevant projections. val result = createResultProjection(unboundExpressions) - val grouping = newProjection(windowSpec.partitionSpec, child.output) + val grouping = newProjection(partitionSpec, child.output) // Manage the stream and the grouping. var nextRow: InternalRow = EmptyRow |