diff options
5 files changed, 40 insertions, 17 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 265f3d1e41..51d910b258 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -347,7 +347,7 @@ class Analyzer( val newOutput = oldVersion.generatorOutput.map(_.newInstance()) (oldVersion, oldVersion.copy(generatorOutput = newOutput)) - case oldVersion @ Window(_, windowExpressions, _, child) + case oldVersion @ Window(_, windowExpressions, _, _, child) if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes) .nonEmpty => (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions))) @@ -825,7 +825,7 @@ class Analyzer( }.asInstanceOf[NamedExpression] } - // Second, we group extractedWindowExprBuffer based on their Window Spec. + // Second, we group extractedWindowExprBuffer based on their Partition and Order Specs. val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr => val distinctWindowSpec = expr.collect { case window: WindowExpression => window.windowSpec @@ -841,7 +841,8 @@ class Analyzer( failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + s"Please file a bug report with this error message, stack trace, and the query.") } else { - distinctWindowSpec.head + val spec = distinctWindowSpec.head + (spec.partitionSpec, spec.orderSpec) } }.toSeq @@ -850,9 +851,10 @@ class Analyzer( var currentChild = child var i = 0 while (i < groupedWindowExpressions.size) { - val (windowSpec, windowExpressions) = groupedWindowExpressions(i) + val ((partitionSpec, orderSpec), windowExpressions) = groupedWindowExpressions(i) // Set currentChild to the newly created Window operator. - currentChild = Window(currentChild.output, windowExpressions, windowSpec, currentChild) + currentChild = Window(currentChild.output, windowExpressions, + partitionSpec, orderSpec, currentChild) // Move to next Window Spec. i += 1 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index a67f8de6b7..aacfc86ab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -228,7 +228,8 @@ case class Aggregate( case class Window( projectList: Seq[Attribute], windowExpressions: Seq[NamedExpression], - windowSpec: WindowSpecDefinition, + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 03d24a88d4..4aff52d992 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -389,8 +389,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil } } - case logical.Window(projectList, windowExpressions, spec, child) => - execution.Window(projectList, windowExpressions, spec, planLater(child)) :: Nil + case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => + execution.Window( + projectList, windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil case logical.Sample(lb, ub, withReplacement, seed, child) => execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 91c8a02e2b..fe9f2c7028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -80,23 +80,24 @@ import scala.collection.mutable case class Window( projectList: Seq[Attribute], windowExpression: Seq[NamedExpression], - windowSpec: WindowSpecDefinition, + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) override def requiredChildDistribution: Seq[Distribution] = { - if (windowSpec.partitionSpec.isEmpty) { + if (partitionSpec.isEmpty) { // Only show warning when the number of bytes is larger than 100 MB? logWarning("No Partition Defined for Window operation! Moving all data to a single " + "partition, this can cause serious performance degradation.") AllTuples :: Nil - } else ClusteredDistribution(windowSpec.partitionSpec) :: Nil + } else ClusteredDistribution(partitionSpec) :: Nil } override def requiredChildOrdering: Seq[Seq[SortOrder]] = - Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -115,12 +116,12 @@ case class Window( case RangeFrame => val (exprs, current, bound) = if (offset == 0) { // Use the entire order expression when the offset is 0. - val exprs = windowSpec.orderSpec.map(_.child) + val exprs = orderSpec.map(_.child) val projection = newMutableProjection(exprs, child.output) - (windowSpec.orderSpec, projection(), projection()) - } else if (windowSpec.orderSpec.size == 1) { + (orderSpec, projection(), projection()) + } else if (orderSpec.size == 1) { // Use only the first order expression when the offset is non-null. - val sortExpr = windowSpec.orderSpec.head + val sortExpr = orderSpec.head val expr = sortExpr.child // Create the projection which returns the current 'value'. val current = newMutableProjection(expr :: Nil, child.output)() @@ -250,7 +251,7 @@ case class Window( // Get all relevant projections. val result = createResultProjection(unboundExpressions) - val grouping = newProjection(windowSpec.partitionSpec, child.output) + val grouping = newProjection(partitionSpec, child.output) // Manage the stream and the grouping. var nextRow: InternalRow = EmptyRow diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala index bdb53ddf59..ba56a8a6b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.functions._ import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.TestHive class HivePlanTest extends QueryTest { @@ -31,4 +34,19 @@ class HivePlanTest extends QueryTest { comparePlans(optimized, correctAnswer) } + + test("window expressions sharing the same partition by and order by clause") { + val df = Seq.empty[(Int, String, Int, Int)].toDF("id", "grp", "seq", "val") + val window = Window. + partitionBy($"grp"). + orderBy($"val") + val query = df.select( + $"id", + sum($"val").over(window.rowsBetween(-1, 1)), + sum($"val").over(window.rangeBetween(-1, 1)) + ) + val plan = query.queryExecution.analyzed + assert(plan.collect{ case w: logical.Window => w }.size === 1, + "Should have only 1 Window operator.") + } } |