aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-06 23:21:52 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-06 23:21:52 -0800
commit6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d (patch)
treef5096b12f8539866286f6ff6a97281bf6f10a86d /sql
parent84e77a15df18ba3f1cc871a3c52c783b46e52369 (diff)
downloadspark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.tar.gz
spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.tar.bz2
spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.zip
[SPARK-12295] [SQL] external spilling for window functions
This PR manage the memory used by window functions (buffered rows), also enable external spilling. After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G. Author: Davies Liu <davies@databricks.com> Closes #10605 from davies/unsafe_window.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala314
1 files changed, 228 insertions, 86 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 89b17c8245..be885397a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -26,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}
+import org.apache.spark.{SparkEnv, TaskContext}
/**
* This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
@@ -283,23 +287,26 @@ case class Window(
val grouping = UnsafeProjection.create(partitionSpec, child.output)
// Manage the stream and the grouping.
- var nextRow: InternalRow = EmptyRow
- var nextGroup: InternalRow = EmptyRow
+ var nextRow: UnsafeRow = null
+ var nextGroup: UnsafeRow = null
var nextRowAvailable: Boolean = false
private[this] def fetchNextRow() {
nextRowAvailable = stream.hasNext
if (nextRowAvailable) {
- nextRow = stream.next()
+ nextRow = stream.next().asInstanceOf[UnsafeRow]
nextGroup = grouping(nextRow)
} else {
- nextRow = EmptyRow
- nextGroup = EmptyRow
+ nextRow = null
+ nextGroup = null
}
}
fetchNextRow()
// Manage the current partition.
- val rows = ArrayBuffer.empty[InternalRow]
+ val rows = ArrayBuffer.empty[UnsafeRow]
+ val inputFields = child.output.length
+ var sorter: UnsafeExternalSorter = null
+ var rowBuffer: RowBuffer = null
val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
val frames = factories.map(_(windowFunctionResult))
val numFrames = frames.length
@@ -307,27 +314,63 @@ case class Window(
// Collect all the rows in the current partition.
// Before we start to fetch new input rows, make a copy of nextGroup.
val currentGroup = nextGroup.copy()
- rows.clear()
+
+ // clear last partition
+ if (sorter != null) {
+ // the last sorter of this task will be cleaned up via task completion listener
+ sorter.cleanupResources()
+ sorter = null
+ } else {
+ rows.clear()
+ }
+
while (nextRowAvailable && nextGroup == currentGroup) {
- rows += nextRow.copy()
+ if (sorter == null) {
+ rows += nextRow.copy()
+
+ if (rows.length >= 4096) {
+ // We will not sort the rows, so prefixComparator and recordComparator are null.
+ sorter = UnsafeExternalSorter.create(
+ TaskContext.get().taskMemoryManager(),
+ SparkEnv.get.blockManager,
+ TaskContext.get(),
+ null,
+ null,
+ 1024,
+ SparkEnv.get.memoryManager.pageSizeBytes)
+ rows.foreach { r =>
+ sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0)
+ }
+ rows.clear()
+ }
+ } else {
+ sorter.insertRecord(nextRow.getBaseObject, nextRow.getBaseOffset,
+ nextRow.getSizeInBytes, 0)
+ }
fetchNextRow()
}
+ if (sorter != null) {
+ rowBuffer = new ExternalRowBuffer(sorter, inputFields)
+ } else {
+ rowBuffer = new ArrayRowBuffer(rows)
+ }
// Setup the frames.
var i = 0
while (i < numFrames) {
- frames(i).prepare(rows)
+ frames(i).prepare(rowBuffer.copy())
i += 1
}
// Setup iteration
rowIndex = 0
- rowsSize = rows.size
+ rowsSize = rowBuffer.size()
}
// Iteration
var rowIndex = 0
- var rowsSize = 0
+ var rowsSize = 0L
+
override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable
val join = new JoinedRow
@@ -340,13 +383,14 @@ case class Window(
if (rowIndex < rowsSize) {
// Get the results for the window frames.
var i = 0
+ val current = rowBuffer.next()
while (i < numFrames) {
- frames(i).write()
+ frames(i).write(rowIndex, current)
i += 1
}
// 'Merge' the input row with the window function result
- join(rows(rowIndex), windowFunctionResult)
+ join(current, windowFunctionResult)
rowIndex += 1
// Return the projection.
@@ -362,14 +406,18 @@ case class Window(
* Function for comparing boundary values.
*/
private[execution] abstract class BoundOrdering {
- def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): Int
+ def compare(inputRow: InternalRow, inputIndex: Int, outputRow: InternalRow, outputIndex: Int): Int
}
/**
* Compare the input index to the bound of the output index.
*/
private[execution] final case class RowBoundOrdering(offset: Int) extends BoundOrdering {
- override def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): Int =
+ override def compare(
+ inputRow: InternalRow,
+ inputIndex: Int,
+ outputRow: InternalRow,
+ outputIndex: Int): Int =
inputIndex - (outputIndex + offset)
}
@@ -380,8 +428,100 @@ private[execution] final case class RangeBoundOrdering(
ordering: Ordering[InternalRow],
current: Projection,
bound: Projection) extends BoundOrdering {
- override def compare(input: Seq[InternalRow], inputIndex: Int, outputIndex: Int): Int =
- ordering.compare(current(input(inputIndex)), bound(input(outputIndex)))
+ override def compare(
+ inputRow: InternalRow,
+ inputIndex: Int,
+ outputRow: InternalRow,
+ outputIndex: Int): Int =
+ ordering.compare(current(inputRow), bound(outputRow))
+}
+
+/**
+ * The interface of row buffer for a partition
+ */
+private[execution] abstract class RowBuffer {
+
+ /** Number of rows. */
+ def size(): Int
+
+ /** Return next row in the buffer, null if no more left. */
+ def next(): InternalRow
+
+ /** Skip the next `n` rows. */
+ def skip(n: Int): Unit
+
+ /** Return a new RowBuffer that has the same rows. */
+ def copy(): RowBuffer
+}
+
+/**
+ * A row buffer based on ArrayBuffer (the number of rows is limited)
+ */
+private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer {
+
+ private[this] var cursor: Int = -1
+
+ /** Number of rows. */
+ def size(): Int = buffer.length
+
+ /** Return next row in the buffer, null if no more left. */
+ def next(): InternalRow = {
+ cursor += 1
+ if (cursor < buffer.length) {
+ buffer(cursor)
+ } else {
+ null
+ }
+ }
+
+ /** Skip the next `n` rows. */
+ def skip(n: Int): Unit = {
+ cursor += n
+ }
+
+ /** Return a new RowBuffer that has the same rows. */
+ def copy(): RowBuffer = {
+ new ArrayRowBuffer(buffer)
+ }
+}
+
+/**
+ * An external buffer of rows based on UnsafeExternalSorter
+ */
+private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int)
+ extends RowBuffer {
+
+ private[this] val iter: UnsafeSorterIterator = sorter.getIterator
+
+ private[this] val currentRow = new UnsafeRow(numFields)
+
+ /** Number of rows. */
+ def size(): Int = iter.getNumRecords()
+
+ /** Return next row in the buffer, null if no more left. */
+ def next(): InternalRow = {
+ if (iter.hasNext) {
+ iter.loadNext()
+ currentRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)
+ currentRow
+ } else {
+ null
+ }
+ }
+
+ /** Skip the next `n` rows. */
+ def skip(n: Int): Unit = {
+ var i = 0
+ while (i < n && iter.hasNext) {
+ iter.loadNext()
+ i += 1
+ }
+ }
+
+ /** Return a new RowBuffer that has the same rows. */
+ def copy(): RowBuffer = {
+ new ExternalRowBuffer(sorter, numFields)
+ }
}
/**
@@ -395,12 +535,12 @@ private[execution] abstract class WindowFunctionFrame {
*
* @param rows to calculate the frame results for.
*/
- def prepare(rows: ArrayBuffer[InternalRow]): Unit
+ def prepare(rows: RowBuffer): Unit
/**
* Write the current results to the target row.
*/
- def write(): Unit
+ def write(index: Int, current: InternalRow): Unit
}
/**
@@ -421,14 +561,11 @@ private[execution] final class OffsetWindowFunctionFrame(
offset: Int) extends WindowFunctionFrame {
/** Rows of the partition currently being processed. */
- private[this] var input: ArrayBuffer[InternalRow] = null
+ private[this] var input: RowBuffer = null
/** Index of the input row currently used for output. */
private[this] var inputIndex = 0
- /** Index of the current output row. */
- private[this] var outputIndex = 0
-
/** Row used when there is no valid input. */
private[this] val emptyRow = new GenericInternalRow(inputSchema.size)
@@ -463,22 +600,26 @@ private[execution] final class OffsetWindowFunctionFrame(
newMutableProjection(boundExpressions, Nil)().target(target)
}
- override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
+ override def prepare(rows: RowBuffer): Unit = {
input = rows
+ // drain the first few rows if offset is larger than zero
+ inputIndex = 0
+ while (inputIndex < offset) {
+ input.next()
+ inputIndex += 1
+ }
inputIndex = offset
- outputIndex = 0
}
- override def write(): Unit = {
- val size = input.size
- if (inputIndex >= 0 && inputIndex < size) {
- join(input(inputIndex), input(outputIndex))
+ override def write(index: Int, current: InternalRow): Unit = {
+ if (inputIndex >= 0 && inputIndex < input.size) {
+ val r = input.next()
+ join(r, current)
} else {
- join(emptyRow, input(outputIndex))
+ join(emptyRow, current)
}
projection(join)
inputIndex += 1
- outputIndex += 1
}
}
@@ -498,7 +639,13 @@ private[execution] final class SlidingWindowFunctionFrame(
ubound: BoundOrdering) extends WindowFunctionFrame {
/** Rows of the partition currently being processed. */
- private[this] var input: ArrayBuffer[InternalRow] = null
+ private[this] var input: RowBuffer = null
+
+ /** The next row from `input`. */
+ private[this] var nextRow: InternalRow = null
+
+ /** The rows within current sliding window. */
+ private[this] val buffer = new util.ArrayDeque[InternalRow]()
/** Index of the first input row with a value greater than the upper bound of the current
* output row. */
@@ -508,33 +655,32 @@ private[execution] final class SlidingWindowFunctionFrame(
* current output row. */
private[this] var inputLowIndex = 0
- /** Index of the row we are currently writing. */
- private[this] var outputIndex = 0
-
/** Prepare the frame for calculating a new partition. Reset all variables. */
- override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
+ override def prepare(rows: RowBuffer): Unit = {
input = rows
+ nextRow = rows.next()
inputHighIndex = 0
inputLowIndex = 0
- outputIndex = 0
+ buffer.clear()
}
/** Write the frame columns for the current row to the given target row. */
- override def write(): Unit = {
- var bufferUpdated = outputIndex == 0
+ override def write(index: Int, current: InternalRow): Unit = {
+ var bufferUpdated = index == 0
// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
- while (inputHighIndex < input.size &&
- ubound.compare(input, inputHighIndex, outputIndex) <= 0) {
+ while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
+ buffer.add(nextRow.copy())
+ nextRow = input.next()
inputHighIndex += 1
bufferUpdated = true
}
// Drop all rows from the buffer for which the input row value is smaller than
// the output row lower bound.
- while (inputLowIndex < inputHighIndex &&
- lbound.compare(input, inputLowIndex, outputIndex) < 0) {
+ while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
+ buffer.remove()
inputLowIndex += 1
bufferUpdated = true
}
@@ -542,12 +688,12 @@ private[execution] final class SlidingWindowFunctionFrame(
// Only recalculate and update when the buffer changes.
if (bufferUpdated) {
processor.initialize(input.size)
- processor.update(input, inputLowIndex, inputHighIndex)
+ val iter = buffer.iterator()
+ while (iter.hasNext) {
+ processor.update(iter.next())
+ }
processor.evaluate(target)
}
-
- // Move to the next row.
- outputIndex += 1
}
}
@@ -567,13 +713,18 @@ private[execution] final class UnboundedWindowFunctionFrame(
processor: AggregateProcessor) extends WindowFunctionFrame {
/** Prepare the frame for calculating a new partition. Process all rows eagerly. */
- override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
- processor.initialize(rows.size)
- processor.update(rows, 0, rows.size)
+ override def prepare(rows: RowBuffer): Unit = {
+ val size = rows.size()
+ processor.initialize(size)
+ var i = 0
+ while (i < size) {
+ processor.update(rows.next())
+ i += 1
+ }
}
/** Write the frame columns for the current row to the given target row. */
- override def write(): Unit = {
+ override def write(index: Int, current: InternalRow): Unit = {
// Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate
// for each row.
processor.evaluate(target)
@@ -600,31 +751,32 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame(
ubound: BoundOrdering) extends WindowFunctionFrame {
/** Rows of the partition currently being processed. */
- private[this] var input: ArrayBuffer[InternalRow] = null
+ private[this] var input: RowBuffer = null
+
+ /** The next row from `input`. */
+ private[this] var nextRow: InternalRow = null
/** Index of the first input row with a value greater than the upper bound of the current
* output row. */
private[this] var inputIndex = 0
- /** Index of the row we are currently writing. */
- private[this] var outputIndex = 0
-
/** Prepare the frame for calculating a new partition. */
- override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
+ override def prepare(rows: RowBuffer): Unit = {
input = rows
+ nextRow = rows.next()
inputIndex = 0
- outputIndex = 0
processor.initialize(input.size)
}
/** Write the frame columns for the current row to the given target row. */
- override def write(): Unit = {
- var bufferUpdated = outputIndex == 0
+ override def write(index: Int, current: InternalRow): Unit = {
+ var bufferUpdated = index == 0
// Add all rows to the aggregates for which the input row value is equal to or less than
// the output row upper bound.
- while (inputIndex < input.size && ubound.compare(input, inputIndex, outputIndex) <= 0) {
- processor.update(input(inputIndex))
+ while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) {
+ processor.update(nextRow)
+ nextRow = input.next()
inputIndex += 1
bufferUpdated = true
}
@@ -633,9 +785,6 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame(
if (bufferUpdated) {
processor.evaluate(target)
}
-
- // Move to the next row.
- outputIndex += 1
}
}
@@ -661,29 +810,31 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame(
lbound: BoundOrdering) extends WindowFunctionFrame {
/** Rows of the partition currently being processed. */
- private[this] var input: ArrayBuffer[InternalRow] = null
+ private[this] var input: RowBuffer = null
/** Index of the first input row with a value equal to or greater than the lower bound of the
* current output row. */
private[this] var inputIndex = 0
- /** Index of the row we are currently writing. */
- private[this] var outputIndex = 0
-
/** Prepare the frame for calculating a new partition. */
- override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
+ override def prepare(rows: RowBuffer): Unit = {
input = rows
inputIndex = 0
- outputIndex = 0
}
/** Write the frame columns for the current row to the given target row. */
- override def write(): Unit = {
- var bufferUpdated = outputIndex == 0
+ override def write(index: Int, current: InternalRow): Unit = {
+ var bufferUpdated = index == 0
+
+ // Duplicate the input to have a new iterator
+ val tmp = input.copy()
// Drop all rows from the buffer for which the input row value is smaller than
// the output row lower bound.
- while (inputIndex < input.size && lbound.compare(input, inputIndex, outputIndex) < 0) {
+ tmp.skip(inputIndex)
+ var nextRow = tmp.next()
+ while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) {
+ nextRow = tmp.next()
inputIndex += 1
bufferUpdated = true
}
@@ -691,12 +842,12 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame(
// Only recalculate and update when the buffer changes.
if (bufferUpdated) {
processor.initialize(input.size)
- processor.update(input, inputIndex, input.size)
+ while (nextRow != null) {
+ processor.update(nextRow)
+ nextRow = tmp.next()
+ }
processor.evaluate(target)
}
-
- // Move to the next row.
- outputIndex += 1
}
}
@@ -825,15 +976,6 @@ private[execution] final class AggregateProcessor(
}
}
- /** Bulk update the given buffer. */
- def update(input: ArrayBuffer[InternalRow], begin: Int, end: Int): Unit = {
- var i = begin
- while (i < end) {
- update(input(i))
- i += 1
- }
- }
-
/** Evaluate buffer. */
def evaluate(target: MutableRow): Unit =
evaluateProjection.target(target)(buffer)