diff options
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala | 53 |
1 files changed, 18 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala index 7149603018..b60f17cc17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala @@ -209,7 +209,8 @@ case class WindowExec( new OffsetWindowFunctionFrame( target, ordinal, - functions, + // OFFSET frame functions are guaranteed be OffsetWindowFunctions. + functions.map(_.asInstanceOf[OffsetWindowFunction]), child.output, (expressions, schema) => newMutableProjection(expressions, schema, subexpressionEliminationEnabled), @@ -557,6 +558,9 @@ private[execution] abstract class WindowFunctionFrame { * The offset window frame calculates frames containing LEAD/LAG statements. * * @param target to write results to. + * @param ordinal the ordinal is the starting offset at which the results of the window frame get + * written into the (shared) target row. The result of the frame expression with + * index 'i' will be written to the 'ordinal' + 'i' position in the target row. * @param expressions to shift a number of rows. * @param inputSchema required for creating a projection. * @param newMutableProjection function used to create the projection. @@ -565,7 +569,7 @@ private[execution] abstract class WindowFunctionFrame { private[execution] final class OffsetWindowFunctionFrame( target: MutableRow, ordinal: Int, - expressions: Array[Expression], + expressions: Array[OffsetWindowFunction], inputSchema: Seq[Attribute], newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, offset: Int) extends WindowFunctionFrame { @@ -576,12 +580,6 @@ private[execution] final class OffsetWindowFunctionFrame( /** Index of the input row currently used for output. */ private[this] var inputIndex = 0 - /** Row used when there is no valid input. */ - private[this] val emptyRow = new GenericInternalRow(inputSchema.size) - - /** Row used to combine the offset and the current row. */ - private[this] val join = new JoinedRow - /** * Create the projection used when the offset row exists. * Please note that this project always respect null input values (like PostgreSQL). @@ -589,12 +587,8 @@ private[execution] final class OffsetWindowFunctionFrame( private[this] val projection = { // Collect the expressions and bind them. val inputAttrs = inputSchema.map(_.withNullability(true)) - val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { - case e: OffsetWindowFunction => - val input = BindReferences.bindReference(e.input, inputAttrs) - input - case e => - BindReferences.bindReference(e, inputAttrs) + val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e => + BindReferences.bindReference(e.input, inputAttrs) } // Create the projection. @@ -605,23 +599,14 @@ private[execution] final class OffsetWindowFunctionFrame( private[this] val fillDefaultValue = { // Collect the expressions and bind them. val inputAttrs = inputSchema.map(_.withNullability(true)) - val numInputAttributes = inputAttrs.size - val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { - case e: OffsetWindowFunction => - if (e.default == null || e.default.foldable && e.default.eval() == null) { - // The default value is null. - Literal.create(null, e.dataType) - } else { - // The default value is an expression. - val default = BindReferences.bindReference(e.default, inputAttrs).transform { - // Shift the input reference to its default version. - case BoundReference(o, dataType, nullable) => - BoundReference(o + numInputAttributes, dataType, nullable) - } - default - } - case e => - BindReferences.bindReference(e, inputAttrs) + val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e => + if (e.default == null || e.default.foldable && e.default.eval() == null) { + // The default value is null. + Literal.create(null, e.dataType) + } else { + // The default value is an expression. + BindReferences.bindReference(e.default, inputAttrs) + } } // Create the projection. @@ -642,12 +627,10 @@ private[execution] final class OffsetWindowFunctionFrame( override def write(index: Int, current: InternalRow): Unit = { if (inputIndex >= 0 && inputIndex < input.size) { val r = input.next() - join(r, current) - projection(join) + projection(r) } else { - join(emptyRow, current) // Use default values since the offset row does not exist. - fillDefaultValue(join) + fillDefaultValue(current) } inputIndex += 1 } |