aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-05-06 10:43:00 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-06 10:43:00 -0700
commitf2c47082c3412a4cf8cbabe12585147c5ec3ea40 (patch)
treee2c9b4112724d2ab79122486f1e2d98375081329 /sql/core
parentc3eb441f5487c9b6476e1d6e2a2d852dcc43b986 (diff)
downloadspark-f2c47082c3412a4cf8cbabe12585147c5ec3ea40.tar.gz
spark-f2c47082c3412a4cf8cbabe12585147c5ec3ea40.tar.bz2
spark-f2c47082c3412a4cf8cbabe12585147c5ec3ea40.zip
[SPARK-1442] [SQL] Window Function Support for Spark SQL
Adding more information about the implementation... This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause). # Implementation # The high level work flow of the implementation is described as follows. * Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support. * Analysis: The query analysis process has three steps for window functions. * Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition. * Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work. * Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows ``` sql(""" SELECT year, country, product, sales, avg(sales) over(partition by product) avg_product, sum(sales) over(partition by country) sum_country FROM sales ORDER BY year, country, product """).explain(true) == Analyzed Logical Plan == Sort [year#34 ASC,country#35 ASC,product#36 ASC], true Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28] Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING Project [year#34,country#35,product#36,sales#37] MetastoreRelation default, sales, None ``` * Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below. ``` == Physical Plan == Sort [year#34 ASC,country#35 ASC,product#36 ASC], true Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), [] Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING Exchange (HashPartitioning [country#35], 200), [country#35 ASC] Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING Exchange (HashPartitioning [product#36], 200), [product#36 ASC] HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None ``` * Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times. # Future work # Here are three improvements that are not hard to add: * Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING). * When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, it will be great if the <value> part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type <value> (<value> still needs to be evaluated as a positive value). * When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the <value> part of <value> PRECEDING and <value> FOLLOWING can support interval types (once we support them). This is a joint work with guowei2 and yhuai Thanks hbutani hvanhovell for his comments Thanks scwf for his comments and unit tests Author: Yin Huai <yhuai@databricks.com> Closes #5604 from guowei2/windowImplement and squashes the following commits: 76fe1c8 [Yin Huai] Implementation. aa2b0ae [Yin Huai] Tests.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala480
2 files changed, 482 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 326e8ce4ca..56a4689eb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -303,6 +303,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Expand(projections, output, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil
+ case logical.Window(projectList, windowExpressions, spec, child) =>
+ execution.Window(projectList, windowExpressions, spec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
new file mode 100644
index 0000000000..217b559def
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, ClusteredDistribution, Partitioning}
+import org.apache.spark.util.collection.CompactBuffer
+
+/**
+ * :: DeveloperApi ::
+ * For every row, evaluates `windowExpression` containing Window Functions and attaches
+ * the results with other regular expressions (presented by `projectList`).
+ * Evert operator handles a single Window Specification, `windowSpec`.
+ */
+case class Window(
+ projectList: Seq[Attribute],
+ windowExpression: Seq[NamedExpression],
+ windowSpec: WindowSpecDefinition,
+ child: SparkPlan)
+ extends UnaryNode {
+
+ override def output: Seq[Attribute] =
+ (projectList ++ windowExpression).map(_.toAttribute)
+
+ override def requiredChildDistribution: Seq[Distribution] =
+ if (windowSpec.partitionSpec.isEmpty) {
+ // This operator will be very expensive.
+ AllTuples :: Nil
+ } else {
+ ClusteredDistribution(windowSpec.partitionSpec) :: Nil
+ }
+
+ // Since window functions are adding columns to the input rows, the child's outputPartitioning
+ // is preserved.
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+ // The required child ordering has two parts.
+ // The first part is the expressions in the partition specification.
+ // We add these expressions to the required ordering to make sure input rows are grouped
+ // based on the partition specification. So, we only need to process a single partition
+ // at a time.
+ // The second part is the expressions specified in the ORDER BY cluase.
+ // Basically, we first use sort to group rows based on partition specifications and then sort
+ // Rows in a group based on the order specification.
+ (windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil
+ }
+
+ // Since window functions basically add columns to input rows, this operator
+ // will not change the ordering of input rows.
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ case class ComputedWindow(
+ unbound: WindowExpression,
+ windowFunction: WindowFunction,
+ resultAttribute: AttributeReference)
+
+ // A list of window functions that need to be computed for each group.
+ private[this] val computedWindowExpressions = windowExpression.flatMap { window =>
+ window.collect {
+ case w: WindowExpression =>
+ ComputedWindow(
+ w,
+ BindReferences.bindReference(w.windowFunction, child.output),
+ AttributeReference(s"windowResult:$w", w.dataType, w.nullable)())
+ }
+ }.toArray
+
+ private[this] val windowFrame =
+ windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+
+ // Create window functions.
+ private[this] def windowFunctions(): Array[WindowFunction] = {
+ val functions = new Array[WindowFunction](computedWindowExpressions.length)
+ var i = 0
+ while (i < computedWindowExpressions.length) {
+ functions(i) = computedWindowExpressions(i).windowFunction.newInstance()
+ functions(i).init()
+ i += 1
+ }
+ functions
+ }
+
+ // The schema of the result of all window function evaluations
+ private[this] val computedSchema = computedWindowExpressions.map(_.resultAttribute)
+
+ private[this] val computedResultMap =
+ computedWindowExpressions.map { w => w.unbound -> w.resultAttribute }.toMap
+
+ private[this] val windowExpressionResult = windowExpression.map { window =>
+ window.transform {
+ case w: WindowExpression if computedResultMap.contains(w) => computedResultMap(w)
+ }
+ }
+
+ def execute(): RDD[Row] = {
+ child.execute().mapPartitions { iter =>
+ new Iterator[Row] {
+
+ // Although input rows are grouped based on windowSpec.partitionSpec, we need to
+ // know when we have a new partition.
+ // This is to manually construct an ordering that can be used to compare rows.
+ // TODO: We may want to have a newOrdering that takes BoundReferences.
+ // So, we can take advantave of code gen.
+ private val partitionOrdering: Ordering[Row] =
+ RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
+
+ // This is used to project expressions for the partition specification.
+ protected val partitionGenerator =
+ newMutableProjection(windowSpec.partitionSpec, child.output)()
+
+ // This is ued to project expressions for the order specification.
+ protected val rowOrderGenerator =
+ newMutableProjection(windowSpec.orderSpec.map(_.child), child.output)()
+
+ // The position of next output row in the inputRowBuffer.
+ var rowPosition: Int = 0
+ // The number of buffered rows in the inputRowBuffer (the size of the current partition).
+ var partitionSize: Int = 0
+ // The buffer used to buffer rows in a partition.
+ var inputRowBuffer: CompactBuffer[Row] = _
+ // The partition key of the current partition.
+ var currentPartitionKey: Row = _
+ // The partition key of next partition.
+ var nextPartitionKey: Row = _
+ // The first row of next partition.
+ var firstRowInNextPartition: Row = _
+ // Indicates if this partition is the last one in the iter.
+ var lastPartition: Boolean = false
+
+ def createBoundaryEvaluator(): () => Unit = {
+ def findPhysicalBoundary(
+ boundary: FrameBoundary): () => Int = boundary match {
+ case UnboundedPreceding => () => 0
+ case UnboundedFollowing => () => partitionSize - 1
+ case CurrentRow => () => rowPosition
+ case ValuePreceding(value) =>
+ () =>
+ val newPosition = rowPosition - value
+ if (newPosition > 0) newPosition else 0
+ case ValueFollowing(value) =>
+ () =>
+ val newPosition = rowPosition + value
+ if (newPosition < partitionSize) newPosition else partitionSize - 1
+ }
+
+ def findLogicalBoundary(
+ boundary: FrameBoundary,
+ searchDirection: Int,
+ evaluator: Expression,
+ joinedRow: JoinedRow): () => Int = boundary match {
+ case UnboundedPreceding => () => 0
+ case UnboundedFollowing => () => partitionSize - 1
+ case other =>
+ () => {
+ // CurrentRow, ValuePreceding, or ValueFollowing.
+ var newPosition = rowPosition + searchDirection
+ var stopSearch = false
+ // rowOrderGenerator is a mutable projection.
+ // We need to make a copy of the returned by rowOrderGenerator since we will
+ // compare searched row with this currentOrderByValue.
+ val currentOrderByValue = rowOrderGenerator(inputRowBuffer(rowPosition)).copy()
+ while (newPosition >= 0 && newPosition < partitionSize && !stopSearch) {
+ val r = rowOrderGenerator(inputRowBuffer(newPosition))
+ stopSearch =
+ !(evaluator.eval(joinedRow(currentOrderByValue, r)).asInstanceOf[Boolean])
+ if (!stopSearch) {
+ newPosition += searchDirection
+ }
+ }
+ newPosition -= searchDirection
+
+ if (newPosition < 0) {
+ 0
+ } else if (newPosition >= partitionSize) {
+ partitionSize - 1
+ } else {
+ newPosition
+ }
+ }
+ }
+
+ windowFrame.frameType match {
+ case RowFrame =>
+ val findStart = findPhysicalBoundary(windowFrame.frameStart)
+ val findEnd = findPhysicalBoundary(windowFrame.frameEnd)
+ () => {
+ frameStart = findStart()
+ frameEnd = findEnd()
+ }
+ case RangeFrame =>
+ val joinedRowForBoundaryEvaluation: JoinedRow = new JoinedRow()
+ val orderByExpr = windowSpec.orderSpec.head
+ val currentRowExpr =
+ BoundReference(0, orderByExpr.dataType, orderByExpr.nullable)
+ val examedRowExpr =
+ BoundReference(1, orderByExpr.dataType, orderByExpr.nullable)
+ val differenceExpr = Abs(Subtract(currentRowExpr, examedRowExpr))
+
+ val frameStartEvaluator = windowFrame.frameStart match {
+ case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
+ case ValuePreceding(value) =>
+ LessThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType))
+ case ValueFollowing(value) =>
+ GreaterThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType))
+ case o => Literal(true) // This is just a dummy expression, we will not use it.
+ }
+
+ val frameEndEvaluator = windowFrame.frameEnd match {
+ case CurrentRow => EqualTo(currentRowExpr, examedRowExpr)
+ case ValuePreceding(value) =>
+ GreaterThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType))
+ case ValueFollowing(value) =>
+ LessThanOrEqual(differenceExpr, Cast(Literal(value), orderByExpr.dataType))
+ case o => Literal(true) // This is just a dummy expression, we will not use it.
+ }
+
+ val findStart =
+ findLogicalBoundary(
+ boundary = windowFrame.frameStart,
+ searchDirection = -1,
+ evaluator = frameStartEvaluator,
+ joinedRow = joinedRowForBoundaryEvaluation)
+ val findEnd =
+ findLogicalBoundary(
+ boundary = windowFrame.frameEnd,
+ searchDirection = 1,
+ evaluator = frameEndEvaluator,
+ joinedRow = joinedRowForBoundaryEvaluation)
+ () => {
+ frameStart = findStart()
+ frameEnd = findEnd()
+ }
+ }
+ }
+
+ val boundaryEvaluator = createBoundaryEvaluator()
+ // Indicates if we the specified window frame requires us to maintain a sliding frame
+ // (e.g. RANGES BETWEEN 1 PRECEDING AND CURRENT ROW) or the window frame
+ // is the entire partition (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).
+ val requireUpdateFrame: Boolean = {
+ def requireUpdateBoundary(boundary: FrameBoundary): Boolean = boundary match {
+ case UnboundedPreceding => false
+ case UnboundedFollowing => false
+ case _ => true
+ }
+
+ requireUpdateBoundary(windowFrame.frameStart) ||
+ requireUpdateBoundary(windowFrame.frameEnd)
+ }
+ // The start position of the current frame in the partition.
+ var frameStart: Int = 0
+ // The end position of the current frame in the partition.
+ var frameEnd: Int = -1
+ // Window functions.
+ val functions: Array[WindowFunction] = windowFunctions()
+ // Buffers used to store input parameters for window functions. Because we may need to
+ // maintain a sliding frame, we use this buffer to avoid evaluate the parameters from
+ // the same row multiple times.
+ val windowFunctionParameterBuffers: Array[util.LinkedList[AnyRef]] =
+ functions.map(_ => new util.LinkedList[AnyRef]())
+
+ // The projection used to generate the final result rows of this operator.
+ private[this] val resultProjection =
+ newMutableProjection(
+ projectList ++ windowExpressionResult,
+ projectList ++ computedSchema)()
+
+ // The row used to hold results of window functions.
+ private[this] val windowExpressionResultRow =
+ new GenericMutableRow(computedSchema.length)
+
+ private[this] val joinedRow = new JoinedRow6
+
+ // Initialize this iterator.
+ initialize()
+
+ private def initialize(): Unit = {
+ if (iter.hasNext) {
+ val currentRow = iter.next().copy()
+ // partitionGenerator is a mutable projection. Since we need to track nextPartitionKey,
+ // we are making a copy of the returned partitionKey at here.
+ nextPartitionKey = partitionGenerator(currentRow).copy()
+ firstRowInNextPartition = currentRow
+ fetchNextPartition()
+ } else {
+ // The iter is an empty one. So, we set all of the following variables
+ // to make sure hasNext will return false.
+ lastPartition = true
+ rowPosition = 0
+ partitionSize = 0
+ }
+ }
+
+ // Indicates if we will have new output row.
+ override final def hasNext: Boolean = {
+ !lastPartition || (rowPosition < partitionSize)
+ }
+
+ override final def next(): Row = {
+ if (hasNext) {
+ if (rowPosition == partitionSize) {
+ // All rows of this buffer have been consumed.
+ // We will move to next partition.
+ fetchNextPartition()
+ }
+ // Get the input row for the current output row.
+ val inputRow = inputRowBuffer(rowPosition)
+ // Get all results of the window functions for this output row.
+ var i = 0
+ while (i < functions.length) {
+ windowExpressionResultRow.update(i, functions(i).get(rowPosition))
+ i += 1
+ }
+
+ // Construct the output row.
+ val outputRow = resultProjection(joinedRow(inputRow, windowExpressionResultRow))
+ // We will move to the next one.
+ rowPosition += 1
+ if (requireUpdateFrame && rowPosition < partitionSize) {
+ // If we need to maintain a sliding frame and
+ // we will still work on this partition when next is called next time, do the update.
+ updateFrame()
+ }
+
+ // Return the output row.
+ outputRow
+ } else {
+ // no more result
+ throw new NoSuchElementException
+ }
+ }
+
+ // Fetch the next partition.
+ private def fetchNextPartition(): Unit = {
+ // Create a new buffer for input rows.
+ inputRowBuffer = new CompactBuffer[Row]()
+ // We already have the first row for this partition
+ // (recorded in firstRowInNextPartition). Add it back.
+ inputRowBuffer += firstRowInNextPartition
+ // Set the current partition key.
+ currentPartitionKey = nextPartitionKey
+ // Now, we will start to find all rows belonging to this partition.
+ // Create a variable to track if we see the next partition.
+ var findNextPartition = false
+ // The search will stop when we see the next partition or there is no
+ // input row left in the iter.
+ while (iter.hasNext && !findNextPartition) {
+ // Make a copy of the input row since we will put it in the buffer.
+ val currentRow = iter.next().copy()
+ // Get the partition key based on the partition specification.
+ // For the below compare method, we do not need to make a copy of partitionKey.
+ val partitionKey = partitionGenerator(currentRow)
+ // Check if the current row belongs the current input row.
+ val comparing = partitionOrdering.compare(currentPartitionKey, partitionKey)
+ if (comparing == 0) {
+ // This row is still in the current partition.
+ inputRowBuffer += currentRow
+ } else {
+ // The current input row is in a different partition.
+ findNextPartition = true
+ // partitionGenerator is a mutable projection.
+ // Since we need to track nextPartitionKey and we determine that it should be set
+ // as partitionKey, we are making a copy of the partitionKey at here.
+ nextPartitionKey = partitionKey.copy()
+ firstRowInNextPartition = currentRow
+ }
+ }
+
+ // We have not seen a new partition. It means that there is no new row in the
+ // iter. The current partition is the last partition of the iter.
+ if (!findNextPartition) {
+ lastPartition = true
+ }
+
+ // We have got all rows for the current partition.
+ // Set rowPosition to 0 (the next output row will be based on the first
+ // input row of this partition).
+ rowPosition = 0
+ // The size of this partition.
+ partitionSize = inputRowBuffer.size
+ // Reset all parameter buffers of window functions.
+ var i = 0
+ while (i < windowFunctionParameterBuffers.length) {
+ windowFunctionParameterBuffers(i).clear()
+ i += 1
+ }
+ frameStart = 0
+ frameEnd = -1
+ // Create the first window frame for this partition.
+ // If we do not need to maintain a sliding frame, this frame will
+ // have the entire partition.
+ updateFrame()
+ }
+
+ /** The function used to maintain the sliding frame. */
+ private def updateFrame(): Unit = {
+ // Based on the difference between the new frame and old frame,
+ // updates the buffers holding input parameters of window functions.
+ // We will start to prepare input parameters starting from the row
+ // indicated by offset in the input row buffer.
+ def updateWindowFunctionParameterBuffers(
+ numToRemove: Int,
+ numToAdd: Int,
+ offset: Int): Unit = {
+ // First, remove unneeded entries from the head of every buffer.
+ var i = 0
+ while (i < numToRemove) {
+ var j = 0
+ while (j < windowFunctionParameterBuffers.length) {
+ windowFunctionParameterBuffers(j).remove()
+ j += 1
+ }
+ i += 1
+ }
+ // Then, add needed entries to the tail of every buffer.
+ i = 0
+ while (i < numToAdd) {
+ var j = 0
+ while (j < windowFunctionParameterBuffers.length) {
+ // Ask the function to prepare the input parameters.
+ val parameters = functions(j).prepareInputParameters(inputRowBuffer(i + offset))
+ windowFunctionParameterBuffers(j).add(parameters)
+ j += 1
+ }
+ i += 1
+ }
+ }
+
+ // Record the current frame start point and end point before
+ // we update them.
+ val previousFrameStart = frameStart
+ val previousFrameEnd = frameEnd
+ boundaryEvaluator()
+ updateWindowFunctionParameterBuffers(
+ frameStart - previousFrameStart,
+ frameEnd - previousFrameEnd,
+ previousFrameEnd + 1)
+ // Evaluate the current frame.
+ evaluateCurrentFrame()
+ }
+
+ /** Evaluate the current window frame. */
+ private def evaluateCurrentFrame(): Unit = {
+ var i = 0
+ while (i < functions.length) {
+ // Reset the state of the window function.
+ functions(i).reset()
+ // Get all buffered input parameters based on rows of this window frame.
+ val inputParameters = windowFunctionParameterBuffers(i).toArray()
+ // Send these input parameters to the window function.
+ functions(i).batchUpdate(inputParameters)
+ // Ask the function to evaluate based on this window frame.
+ functions(i).evaluate()
+ i += 1
+ }
+ }
+ }
+ }
+ }
+}