aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-08-08 16:34:57 -0700
committerYin Huai <yhuai@databricks.com>2016-08-08 16:34:57 -0700
commitdf10658831f4e5f9756a5732673ad12904b5d05c (patch)
tree8a4bc778cbd63df61021033d01f73ba404807065 /sql/core
parent53d1c7877967f03cc9c8c7e7394f380d1bbefc27 (diff)
downloadspark-df10658831f4e5f9756a5732673ad12904b5d05c.tar.gz
spark-df10658831f4e5f9756a5732673ad12904b5d05c.tar.bz2
spark-df10658831f4e5f9756a5732673ad12904b5d05c.zip
[SPARK-16749][SQL] Simplify processing logic in LEAD/LAG processing.
## What changes were proposed in this pull request? The logic for LEAD/LAG processing is more complex that it needs to be. This PR fixes that. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #14376 from hvanhovell/SPARK-16749.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala53
1 files changed, 18 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
index 7149603018..b60f17cc17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
@@ -209,7 +209,8 @@ case class WindowExec(
new OffsetWindowFunctionFrame(
target,
ordinal,
- functions,
+ // OFFSET frame functions are guaranteed be OffsetWindowFunctions.
+ functions.map(_.asInstanceOf[OffsetWindowFunction]),
child.output,
(expressions, schema) =>
newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
@@ -557,6 +558,9 @@ private[execution] abstract class WindowFunctionFrame {
* The offset window frame calculates frames containing LEAD/LAG statements.
*
* @param target to write results to.
+ * @param ordinal the ordinal is the starting offset at which the results of the window frame get
+ * written into the (shared) target row. The result of the frame expression with
+ * index 'i' will be written to the 'ordinal' + 'i' position in the target row.
* @param expressions to shift a number of rows.
* @param inputSchema required for creating a projection.
* @param newMutableProjection function used to create the projection.
@@ -565,7 +569,7 @@ private[execution] abstract class WindowFunctionFrame {
private[execution] final class OffsetWindowFunctionFrame(
target: MutableRow,
ordinal: Int,
- expressions: Array[Expression],
+ expressions: Array[OffsetWindowFunction],
inputSchema: Seq[Attribute],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
offset: Int) extends WindowFunctionFrame {
@@ -576,12 +580,6 @@ private[execution] final class OffsetWindowFunctionFrame(
/** Index of the input row currently used for output. */
private[this] var inputIndex = 0
- /** Row used when there is no valid input. */
- private[this] val emptyRow = new GenericInternalRow(inputSchema.size)
-
- /** Row used to combine the offset and the current row. */
- private[this] val join = new JoinedRow
-
/**
* Create the projection used when the offset row exists.
* Please note that this project always respect null input values (like PostgreSQL).
@@ -589,12 +587,8 @@ private[execution] final class OffsetWindowFunctionFrame(
private[this] val projection = {
// Collect the expressions and bind them.
val inputAttrs = inputSchema.map(_.withNullability(true))
- val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
- case e: OffsetWindowFunction =>
- val input = BindReferences.bindReference(e.input, inputAttrs)
- input
- case e =>
- BindReferences.bindReference(e, inputAttrs)
+ val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
+ BindReferences.bindReference(e.input, inputAttrs)
}
// Create the projection.
@@ -605,23 +599,14 @@ private[execution] final class OffsetWindowFunctionFrame(
private[this] val fillDefaultValue = {
// Collect the expressions and bind them.
val inputAttrs = inputSchema.map(_.withNullability(true))
- val numInputAttributes = inputAttrs.size
- val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
- case e: OffsetWindowFunction =>
- if (e.default == null || e.default.foldable && e.default.eval() == null) {
- // The default value is null.
- Literal.create(null, e.dataType)
- } else {
- // The default value is an expression.
- val default = BindReferences.bindReference(e.default, inputAttrs).transform {
- // Shift the input reference to its default version.
- case BoundReference(o, dataType, nullable) =>
- BoundReference(o + numInputAttributes, dataType, nullable)
- }
- default
- }
- case e =>
- BindReferences.bindReference(e, inputAttrs)
+ val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
+ if (e.default == null || e.default.foldable && e.default.eval() == null) {
+ // The default value is null.
+ Literal.create(null, e.dataType)
+ } else {
+ // The default value is an expression.
+ BindReferences.bindReference(e.default, inputAttrs)
+ }
}
// Create the projection.
@@ -642,12 +627,10 @@ private[execution] final class OffsetWindowFunctionFrame(
override def write(index: Int, current: InternalRow): Unit = {
if (inputIndex >= 0 && inputIndex < input.size) {
val r = input.next()
- join(r, current)
- projection(join)
+ projection(r)
} else {
- join(emptyRow, current)
// Use default values since the offset row does not exist.
- fillDefaultValue(join)
+ fillDefaultValue(current)
}
inputIndex += 1
}