aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala494
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala2
7 files changed, 8 insertions, 498 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index dbda05a792..6023a2c564 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -44,7 +44,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
new GenericInternalRow(outputArray)
}
- override def toString: String = s"Row => [${exprArray.mkString(",")}]"
+ override def toString(): String = s"Row => [${exprArray.mkString(",")}]"
}
/**
@@ -58,7 +58,7 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
this(expressions.map(BindReferences.bindReference(_, inputSchema)))
private[this] val exprArray = expressions.toArray
- private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.size)
+ private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
def currentValue: InternalRow = mutableRow
override def target(row: MutableRow): MutableProjection = {
@@ -237,493 +237,3 @@ class JoinedRow extends InternalRow {
}
}
}
-
-/**
- * JIT HACK: Replace with macros
- * The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there
- * are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the
- * calls in the critical path are polymorphic. By creating special versions of this class that are
- * used in only a single location of the code, we increase the chance that only a single type of
- * Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
- * crazy but in benchmarks it had noticeable effects.
- */
-class JoinedRow2 extends InternalRow {
- private[this] var row1: InternalRow = _
- private[this] var row2: InternalRow = _
-
- def this(left: InternalRow, right: InternalRow) = {
- this()
- row1 = left
- row2 = right
- }
-
- /** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
- row1 = r1
- row2 = r2
- this
- }
-
- /** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: InternalRow): InternalRow = {
- row1 = newLeft
- this
- }
-
- /** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: InternalRow): InternalRow = {
- row2 = newRight
- this
- }
-
- override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
- override def numFields: Int = row1.numFields + row2.numFields
-
- override def getUTF8String(i: Int): UTF8String = {
- if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields)
- }
-
- override def getBinary(i: Int): Array[Byte] = {
- if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)
- }
-
- override def get(i: Int): Any =
- if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
- override def isNullAt(i: Int): Boolean =
- if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields)
-
- override def getInt(i: Int): Int =
- if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
- override def getLong(i: Int): Long =
- if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields)
-
- override def getDouble(i: Int): Double =
- if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields)
-
- override def getBoolean(i: Int): Boolean =
- if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields)
-
- override def getShort(i: Int): Short =
- if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields)
-
- override def getByte(i: Int): Byte =
- if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields)
-
- override def getFloat(i: Int): Float =
- if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields)
-
- override def copy(): InternalRow = {
- val totalSize = row1.numFields + row2.numFields
- val copiedValues = new Array[Any](totalSize)
- var i = 0
- while(i < totalSize) {
- copiedValues(i) = get(i)
- i += 1
- }
- new GenericInternalRow(copiedValues)
- }
-
- override def toString: String = {
- // Make sure toString never throws NullPointerException.
- if ((row1 eq null) && (row2 eq null)) {
- "[ empty row ]"
- } else if (row1 eq null) {
- row2.mkString("[", ",", "]")
- } else if (row2 eq null) {
- row1.mkString("[", ",", "]")
- } else {
- mkString("[", ",", "]")
- }
- }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow3 extends InternalRow {
- private[this] var row1: InternalRow = _
- private[this] var row2: InternalRow = _
-
- def this(left: InternalRow, right: InternalRow) = {
- this()
- row1 = left
- row2 = right
- }
-
- /** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
- row1 = r1
- row2 = r2
- this
- }
-
- /** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: InternalRow): InternalRow = {
- row1 = newLeft
- this
- }
-
- /** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: InternalRow): InternalRow = {
- row2 = newRight
- this
- }
-
- override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
- override def numFields: Int = row1.numFields + row2.numFields
-
- override def getUTF8String(i: Int): UTF8String = {
- if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields)
- }
-
- override def getBinary(i: Int): Array[Byte] = {
- if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)
- }
-
-
- override def get(i: Int): Any =
- if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
- override def isNullAt(i: Int): Boolean =
- if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields)
-
- override def getInt(i: Int): Int =
- if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
- override def getLong(i: Int): Long =
- if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields)
-
- override def getDouble(i: Int): Double =
- if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields)
-
- override def getBoolean(i: Int): Boolean =
- if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields)
-
- override def getShort(i: Int): Short =
- if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields)
-
- override def getByte(i: Int): Byte =
- if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields)
-
- override def getFloat(i: Int): Float =
- if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields)
-
- override def copy(): InternalRow = {
- val totalSize = row1.numFields + row2.numFields
- val copiedValues = new Array[Any](totalSize)
- var i = 0
- while(i < totalSize) {
- copiedValues(i) = get(i)
- i += 1
- }
- new GenericInternalRow(copiedValues)
- }
-
- override def toString: String = {
- // Make sure toString never throws NullPointerException.
- if ((row1 eq null) && (row2 eq null)) {
- "[ empty row ]"
- } else if (row1 eq null) {
- row2.mkString("[", ",", "]")
- } else if (row2 eq null) {
- row1.mkString("[", ",", "]")
- } else {
- mkString("[", ",", "]")
- }
- }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow4 extends InternalRow {
- private[this] var row1: InternalRow = _
- private[this] var row2: InternalRow = _
-
- def this(left: InternalRow, right: InternalRow) = {
- this()
- row1 = left
- row2 = right
- }
-
- /** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
- row1 = r1
- row2 = r2
- this
- }
-
- /** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: InternalRow): InternalRow = {
- row1 = newLeft
- this
- }
-
- /** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: InternalRow): InternalRow = {
- row2 = newRight
- this
- }
-
- override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
- override def numFields: Int = row1.numFields + row2.numFields
-
- override def getUTF8String(i: Int): UTF8String = {
- if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields)
- }
-
- override def getBinary(i: Int): Array[Byte] = {
- if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)
- }
-
-
- override def get(i: Int): Any =
- if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
- override def isNullAt(i: Int): Boolean =
- if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields)
-
- override def getInt(i: Int): Int =
- if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
- override def getLong(i: Int): Long =
- if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields)
-
- override def getDouble(i: Int): Double =
- if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields)
-
- override def getBoolean(i: Int): Boolean =
- if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields)
-
- override def getShort(i: Int): Short =
- if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields)
-
- override def getByte(i: Int): Byte =
- if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields)
-
- override def getFloat(i: Int): Float =
- if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields)
-
- override def copy(): InternalRow = {
- val totalSize = row1.numFields + row2.numFields
- val copiedValues = new Array[Any](totalSize)
- var i = 0
- while(i < totalSize) {
- copiedValues(i) = get(i)
- i += 1
- }
- new GenericInternalRow(copiedValues)
- }
-
- override def toString: String = {
- // Make sure toString never throws NullPointerException.
- if ((row1 eq null) && (row2 eq null)) {
- "[ empty row ]"
- } else if (row1 eq null) {
- row2.mkString("[", ",", "]")
- } else if (row2 eq null) {
- row1.mkString("[", ",", "]")
- } else {
- mkString("[", ",", "]")
- }
- }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow5 extends InternalRow {
- private[this] var row1: InternalRow = _
- private[this] var row2: InternalRow = _
-
- def this(left: InternalRow, right: InternalRow) = {
- this()
- row1 = left
- row2 = right
- }
-
- /** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
- row1 = r1
- row2 = r2
- this
- }
-
- /** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: InternalRow): InternalRow = {
- row1 = newLeft
- this
- }
-
- /** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: InternalRow): InternalRow = {
- row2 = newRight
- this
- }
-
- override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
- override def numFields: Int = row1.numFields + row2.numFields
-
- override def getUTF8String(i: Int): UTF8String = {
- if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields)
- }
-
- override def getBinary(i: Int): Array[Byte] = {
- if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)
- }
-
-
- override def get(i: Int): Any =
- if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
- override def isNullAt(i: Int): Boolean =
- if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields)
-
- override def getInt(i: Int): Int =
- if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
- override def getLong(i: Int): Long =
- if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields)
-
- override def getDouble(i: Int): Double =
- if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields)
-
- override def getBoolean(i: Int): Boolean =
- if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields)
-
- override def getShort(i: Int): Short =
- if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields)
-
- override def getByte(i: Int): Byte =
- if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields)
-
- override def getFloat(i: Int): Float =
- if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields)
-
- override def copy(): InternalRow = {
- val totalSize = row1.numFields + row2.numFields
- val copiedValues = new Array[Any](totalSize)
- var i = 0
- while(i < totalSize) {
- copiedValues(i) = get(i)
- i += 1
- }
- new GenericInternalRow(copiedValues)
- }
-
- override def toString: String = {
- // Make sure toString never throws NullPointerException.
- if ((row1 eq null) && (row2 eq null)) {
- "[ empty row ]"
- } else if (row1 eq null) {
- row2.mkString("[", ",", "]")
- } else if (row2 eq null) {
- row1.mkString("[", ",", "]")
- } else {
- mkString("[", ",", "]")
- }
- }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow6 extends InternalRow {
- private[this] var row1: InternalRow = _
- private[this] var row2: InternalRow = _
-
- def this(left: InternalRow, right: InternalRow) = {
- this()
- row1 = left
- row2 = right
- }
-
- /** Updates this JoinedRow to used point at two new base rows. Returns itself. */
- def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
- row1 = r1
- row2 = r2
- this
- }
-
- /** Updates this JoinedRow by updating its left base row. Returns itself. */
- def withLeft(newLeft: InternalRow): InternalRow = {
- row1 = newLeft
- this
- }
-
- /** Updates this JoinedRow by updating its right base row. Returns itself. */
- def withRight(newRight: InternalRow): InternalRow = {
- row2 = newRight
- this
- }
-
- override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
- override def numFields: Int = row1.numFields + row2.numFields
-
- override def getUTF8String(i: Int): UTF8String = {
- if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields)
- }
-
- override def getBinary(i: Int): Array[Byte] = {
- if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)
- }
-
-
- override def get(i: Int): Any =
- if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
- override def isNullAt(i: Int): Boolean =
- if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields)
-
- override def getInt(i: Int): Int =
- if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
- override def getLong(i: Int): Long =
- if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields)
-
- override def getDouble(i: Int): Double =
- if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields)
-
- override def getBoolean(i: Int): Boolean =
- if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields)
-
- override def getShort(i: Int): Short =
- if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields)
-
- override def getByte(i: Int): Byte =
- if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields)
-
- override def getFloat(i: Int): Float =
- if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields)
-
- override def copy(): InternalRow = {
- val totalSize = row1.numFields + row2.numFields
- val copiedValues = new Array[Any](totalSize)
- var i = 0
- while(i < totalSize) {
- copiedValues(i) = get(i)
- i += 1
- }
- new GenericInternalRow(copiedValues)
- }
-
- override def toString: String = {
- // Make sure toString never throws NullPointerException.
- if ((row1 eq null) && (row2 eq null)) {
- "[ empty row ]"
- } else if (row1 eq null) {
- row2.mkString("[", ",", "]")
- } else if (row2 eq null) {
- row1.mkString("[", ",", "]")
- } else {
- mkString("[", ",", "]")
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index c2c945321d..e8c6a0f8f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -172,7 +172,7 @@ case class Aggregate(
private[this] val resultProjection =
new InterpretedMutableProjection(
resultExpressions, computedSchema ++ namedGroups.map(_._2))
- private[this] val joinedRow = new JoinedRow4
+ private[this] val joinedRow = new JoinedRow
override final def hasNext: Boolean = hashTableIter.hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 5ed158b3d2..5ad4691a5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -269,7 +269,7 @@ case class GeneratedAggregate(
namedGroups.map(_._2) ++ computationSchema)
log.info(s"Result Projection: ${resultExpressions.mkString(",")}")
- val joinedRow = new JoinedRow3
+ val joinedRow = new JoinedRow
if (!iter.hasNext) {
// This is an empty input, so return early so that we do not allocate data structures
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index de04132eb1..91c8a02e2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -298,7 +298,7 @@ case class Window(
var rowsSize = 0
override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable
- val join = new JoinedRow6
+ val join = new JoinedRow
val windowFunctionResult = new GenericMutableRow(unboundExpressions.size)
override final def next(): InternalRow = {
// Load the next partition if we need to.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
index b8e95a5a2a..1b89edafa8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
@@ -106,7 +106,7 @@ private[sql] abstract class SortAggregationIterator(
new GenericMutableRow(size)
}
- protected val joinedRow = new JoinedRow4
+ protected val joinedRow = new JoinedRow
protected val placeholderExpressions = Seq.fill(initialBufferOffset)(NoOp)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index ae34409bcf..46ab5b0d1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -69,7 +69,7 @@ trait HashJoin {
private[this] var currentMatchPosition: Int = -1
// Mutable per row objects.
- private[this] val joinRow = new JoinedRow2
+ private[this] val joinRow = new JoinedRow
private[this] val resultProjection: Projection = {
if (supportUnsafe) {
UnsafeProjection.create(self.schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 981447eaca..bb18b5403f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -66,7 +66,7 @@ case class SortMergeJoin(
leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
new Iterator[InternalRow] {
// Mutable per row objects.
- private[this] val joinRow = new JoinedRow5
+ private[this] val joinRow = new JoinedRow
private[this] var leftElement: InternalRow = _
private[this] var rightElement: InternalRow = _
private[this] var leftKey: InternalRow = _