aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-10-07 14:03:45 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-10-07 14:03:45 -0700
commit97594c29b723f372a5c4c061760015bd78d01f50 (patch)
treeda92aa1ccd039678667376a33abb9b8481ce3824
parent2badb58cdd7833465202197c4c52db5aa3d4c6e7 (diff)
downloadspark-97594c29b723f372a5c4c061760015bd78d01f50.tar.gz
spark-97594c29b723f372a5c4c061760015bd78d01f50.tar.bz2
spark-97594c29b723f372a5c4c061760015bd78d01f50.zip
[SPARK-17761][SQL] Remove MutableRow
## What changes were proposed in this pull request? In practice we cannot guarantee that an `InternalRow` is immutable. This makes the `MutableRow` almost redundant. This PR folds `MutableRow` into `InternalRow`. The code below illustrates the immutability issue with InternalRow: ```scala import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow val struct = new GenericMutableRow(1) val row = InternalRow(struct, 1) println(row) scala> [[null], 1] struct.setInt(0, 42) println(row) scala> [[42], 1] ``` This might be somewhat controversial, so feedback is appreciated. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #15333 from hvanhovell/SPARK-17761.
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala6
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala)5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala44
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala26
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala9
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala13
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala38
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala2
71 files changed, 343 insertions, 347 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
index a1e53662f0..f4a8556c71 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml.linalg
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -46,7 +46,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
}
override def serialize(obj: Matrix): InternalRow = {
- val row = new GenericMutableRow(7)
+ val row = new GenericInternalRow(7)
obj match {
case sm: SparseMatrix =>
row.setByte(0, 0)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
index 0b9b2ff5c5..917861309c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml.linalg
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -42,14 +42,14 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
override def serialize(obj: Vector): InternalRow = {
obj match {
case SparseVector(size, indices, values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 0)
row.setInt(1, size)
row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case DenseVector(values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 1)
row.setNullAt(1)
row.setNullAt(2)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 6642999a21..542a69b3ef 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -28,7 +28,7 @@ import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.Since
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -189,7 +189,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
}
override def serialize(obj: Matrix): InternalRow = {
- val row = new GenericMutableRow(7)
+ val row = new GenericInternalRow(7)
obj match {
case sm: SparseMatrix =>
row.setByte(0, 0)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 91f065831c..fbd217af74 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.{AlphaComponent, Since}
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -214,14 +214,14 @@ class VectorUDT extends UserDefinedType[Vector] {
override def serialize(obj: Vector): InternalRow = {
obj match {
case SparseVector(size, indices, values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 0)
row.setInt(1, size)
row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case DenseVector(values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 1)
row.setNullAt(1)
row.setNullAt(2)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 9027652d57..c3f0abac24 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -59,7 +59,7 @@ import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
-public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {
+public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
//////////////////////////////////////////////////////////////////////////////
// Static methods
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index eba95c5c8b..f498e071b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
/**
* An abstract class for row used internal in Spark SQL, which only contain the columns as
@@ -31,6 +31,27 @@ abstract class InternalRow extends SpecializedGetters with Serializable {
// This is only use for test and will throw a null pointer exception if the position is null.
def getString(ordinal: Int): String = getUTF8String(ordinal).toString
+ def setNullAt(i: Int): Unit
+
+ def update(i: Int, value: Any): Unit
+
+ // default implementation (slow)
+ def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
+ def setByte(i: Int, value: Byte): Unit = update(i, value)
+ def setShort(i: Int, value: Short): Unit = update(i, value)
+ def setInt(i: Int, value: Int): Unit = update(i, value)
+ def setLong(i: Int, value: Long): Unit = update(i, value)
+ def setFloat(i: Int, value: Float): Unit = update(i, value)
+ def setDouble(i: Int, value: Double): Unit = update(i, value)
+
+ /**
+ * Update the decimal column at `i`.
+ *
+ * Note: In order to support update decimal with precision > 18 in UnsafeRow,
+ * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
+ */
+ def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
+
/**
* Make a copy of the current [[InternalRow]] object.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index b96b744b4f..82e1a8a7ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -256,7 +256,7 @@ case class ExpressionEncoder[T](
private lazy val extractProjection = GenerateUnsafeProjection.generate(serializer)
@transient
- private lazy val inputRow = new GenericMutableRow(1)
+ private lazy val inputRow = new GenericInternalRow(1)
@transient
private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 70fff51956..1314c41651 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -403,7 +403,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
}
// TODO: Could be faster?
- val newRow = new GenericMutableRow(from.fields.length)
+ val newRow = new GenericInternalRow(from.fields.length)
buildCast[InternalRow](_, row => {
var i = 0
while (i < row.numFields) {
@@ -892,7 +892,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
val fieldsCasts = from.fields.zip(to.fields).map {
case (fromField, toField) => nullSafeCastFunction(fromField.dataType, toField.dataType, ctx)
}
- val rowClass = classOf[GenericMutableRow].getName
+ val rowClass = classOf[GenericInternalRow].getName
val result = ctx.freshName("result")
val tmpRow = ctx.freshName("tmpRow")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index ed894f6d6e..7770684a5b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -123,6 +123,22 @@ class JoinedRow extends InternalRow {
override def anyNull: Boolean = row1.anyNull || row2.anyNull
+ override def setNullAt(i: Int): Unit = {
+ if (i < row1.numFields) {
+ row1.setNullAt(i)
+ } else {
+ row2.setNullAt(i - row1.numFields)
+ }
+ }
+
+ override def update(i: Int, value: Any): Unit = {
+ if (i < row1.numFields) {
+ row1.update(i, value)
+ } else {
+ row2.update(i - row1.numFields, value)
+ }
+ }
+
override def copy(): InternalRow = {
val copy1 = row1.copy()
val copy2 = row2.copy()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index c8d18667f7..a81fa1ce3a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -69,10 +69,10 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
})
private[this] val exprArray = expressions.toArray
- private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
+ private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
def currentValue: InternalRow = mutableRow
- override def target(row: MutableRow): MutableProjection = {
+ override def target(row: InternalRow): MutableProjection = {
mutableRow = row
this
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
index 61ca7272df..74e0b4691d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.types._
/**
* A parent class for mutable container objects that are reused when the values are changed,
- * resulting in less garbage. These values are held by a [[SpecificMutableRow]].
+ * resulting in less garbage. These values are held by a [[SpecificInternalRow]].
*
* The following code was roughly used to generate these objects:
* {{{
@@ -191,8 +191,7 @@ final class MutableAny extends MutableValue {
* based on the dataTypes of each column. The intent is to decrease garbage when modifying the
* values of primitive columns.
*/
-final class SpecificMutableRow(val values: Array[MutableValue])
- extends MutableRow with BaseGenericInternalRow {
+final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
def this(dataTypes: Seq[DataType]) =
this(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
index 1d218da6db..83c8d400c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
@@ -155,7 +155,7 @@ case class HyperLogLogPlusPlus(
aggBufferAttributes.map(_.newInstance())
/** Fill all words with zeros. */
- override def initialize(buffer: MutableRow): Unit = {
+ override def initialize(buffer: InternalRow): Unit = {
var word = 0
while (word < numWords) {
buffer.setLong(mutableAggBufferOffset + word, 0)
@@ -168,7 +168,7 @@ case class HyperLogLogPlusPlus(
*
* Variable names in the HLL++ paper match variable names in the code.
*/
- override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ override def update(buffer: InternalRow, input: InternalRow): Unit = {
val v = child.eval(input)
if (v != null) {
// Create the hashed value 'x'.
@@ -200,7 +200,7 @@ case class HyperLogLogPlusPlus(
* Merge the HLL buffers by iterating through the registers in both buffers and select the
* maximum number of leading zeros for each register.
*/
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
var idx = 0
var wordOffset = 0
while (wordOffset < numWords) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
index 16c03c500a..0876060772 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
@@ -30,7 +30,7 @@ object PivotFirst {
// Currently UnsafeRow does not support the generic update method (throws
// UnsupportedOperationException), so we need to explicitly support each DataType.
- private val updateFunction: PartialFunction[DataType, (MutableRow, Int, Any) => Unit] = {
+ private val updateFunction: PartialFunction[DataType, (InternalRow, Int, Any) => Unit] = {
case DoubleType =>
(row, offset, value) => row.setDouble(offset, value.asInstanceOf[Double])
case IntegerType =>
@@ -89,9 +89,9 @@ case class PivotFirst(
val indexSize = pivotIndex.size
- private val updateRow: (MutableRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
+ private val updateRow: (InternalRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
- override def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit = {
+ override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = {
val pivotColValue = pivotColumn.eval(inputRow)
if (pivotColValue != null) {
// We ignore rows whose pivot column value is not in the list of pivot column values.
@@ -105,7 +105,7 @@ case class PivotFirst(
}
}
- override def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit = {
+ override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = {
for (i <- 0 until indexSize) {
if (!inputAggBuffer.isNullAt(inputAggBufferOffset + i)) {
val value = inputAggBuffer.get(inputAggBufferOffset + i, valueDataType)
@@ -114,7 +114,7 @@ case class PivotFirst(
}
}
- override def initialize(mutableAggBuffer: MutableRow): Unit = valueDataType match {
+ override def initialize(mutableAggBuffer: InternalRow): Unit = valueDataType match {
case d: DecimalType =>
// Per doc of setDecimal we need to do this instead of setNullAt for DecimalType.
for (i <- 0 until indexSize) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 78a388d206..89eb864e94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -60,11 +60,11 @@ abstract class Collect extends ImperativeAggregate {
protected[this] val buffer: Growable[Any] with Iterable[Any]
- override def initialize(b: MutableRow): Unit = {
+ override def initialize(b: InternalRow): Unit = {
buffer.clear()
}
- override def update(b: MutableRow, input: InternalRow): Unit = {
+ override def update(b: InternalRow, input: InternalRow): Unit = {
// Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here.
// See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator
val value = child.eval(input)
@@ -73,7 +73,7 @@ abstract class Collect extends ImperativeAggregate {
}
}
- override def merge(buffer: MutableRow, input: InternalRow): Unit = {
+ override def merge(buffer: InternalRow, input: InternalRow): Unit = {
sys.error("Collect cannot be used in partial aggregations.")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index b5c0844fbf..f3fd58bc98 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -307,14 +307,14 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
*
* Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
*/
- def initialize(mutableAggBuffer: MutableRow): Unit
+ def initialize(mutableAggBuffer: InternalRow): Unit
/**
* Updates its aggregation buffer, located in `mutableAggBuffer`, based on the given `inputRow`.
*
* Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
*/
- def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit
+ def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit
/**
* Combines new intermediate results from the `inputAggBuffer` with the existing intermediate
@@ -323,7 +323,7 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
* Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
* Use `fieldNumber + inputAggBufferOffset` to access fields of `inputAggBuffer`.
*/
- def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit
+ def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit
}
/**
@@ -504,16 +504,16 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
/** De-serializes the serialized format Array[Byte], and produces aggregation buffer object T */
def deserialize(storageFormat: Array[Byte]): T
- final override def initialize(buffer: MutableRow): Unit = {
+ final override def initialize(buffer: InternalRow): Unit = {
val bufferObject = createAggregationBuffer()
buffer.update(mutableAggBufferOffset, bufferObject)
}
- final override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ final override def update(buffer: InternalRow, input: InternalRow): Unit = {
update(getBufferObject(buffer), input)
}
- final override def merge(buffer: MutableRow, inputBuffer: InternalRow): Unit = {
+ final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
val bufferObject = getBufferObject(buffer)
// The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
@@ -547,7 +547,7 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
* This is only called when doing Partial or PartialMerge mode aggregation, before the framework
* shuffle out aggregate buffers.
*/
- final def serializeAggregateBufferInPlace(buffer: MutableRow): Unit = {
+ final def serializeAggregateBufferInPlace(buffer: InternalRow): Unit = {
buffer(mutableAggBufferOffset) = serialize(getBufferObject(buffer))
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 574943d3d2..6cab50ae1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -819,7 +819,7 @@ class CodeAndComment(val body: String, val comment: collection.Map[String, Strin
*/
abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Logging {
- protected val genericMutableRowType: String = classOf[GenericMutableRow].getName
+ protected val genericMutableRowType: String = classOf[GenericInternalRow].getName
/**
* Generates a class for a given input expression. Called when there is not cached code
@@ -889,7 +889,6 @@ object CodeGenerator extends Logging {
classOf[UnsafeArrayData].getName,
classOf[MapData].getName,
classOf[UnsafeMapData].getName,
- classOf[MutableRow].getName,
classOf[Expression].getName
))
evaluator.setExtendedClass(classOf[GeneratedClass])
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 13d61af1c9..5c4b56b0b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
abstract class BaseMutableProjection extends MutableProjection
/**
- * Generates byte code that produces a [[MutableRow]] object that can update itself based on a new
+ * Generates byte code that produces a [[InternalRow]] object that can update itself based on a new
* input [[InternalRow]] for a fixed set of [[Expression Expressions]].
* It exposes a `target` method, which is used to set the row that will be updated.
- * The internal [[MutableRow]] object created internally is used only when `target` is not used.
+ * The internal [[InternalRow]] object created internally is used only when `target` is not used.
*/
object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableProjection] {
@@ -102,7 +102,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
class SpecificMutableProjection extends ${classOf[BaseMutableProjection].getName} {
private Object[] references;
- private MutableRow mutableRow;
+ private InternalRow mutableRow;
${ctx.declareMutableStates()}
public SpecificMutableProjection(Object[] references) {
@@ -113,7 +113,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
${ctx.declareAddedFunctions()}
- public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
+ public ${classOf[BaseMutableProjection].getName} target(InternalRow row) {
mutableRow = row;
return this;
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index 1c98c9ed10..2773e1a666 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
abstract class BaseProjection extends Projection {}
/**
- * Generates byte code that produces a [[MutableRow]] object (not an [[UnsafeRow]]) that can update
+ * Generates byte code that produces a [[InternalRow]] object (not an [[UnsafeRow]]) that can update
* itself based on a new input [[InternalRow]] for a fixed set of [[Expression Expressions]].
*/
object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] {
@@ -164,12 +164,12 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
class SpecificSafeProjection extends ${classOf[BaseProjection].getName} {
private Object[] references;
- private MutableRow mutableRow;
+ private InternalRow mutableRow;
${ctx.declareMutableStates()}
public SpecificSafeProjection(Object[] references) {
this.references = references;
- mutableRow = (MutableRow) references[references.length - 1];
+ mutableRow = (InternalRow) references[references.length - 1];
${ctx.initMutableStates()}
}
@@ -188,7 +188,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
val c = CodeGenerator.compile(code)
- val resultRow = new SpecificMutableRow(expressions.map(_.dataType))
+ val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index a6125c61e5..1510a47966 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -81,7 +81,7 @@ package object expressions {
def currentValue: InternalRow
/** Uses the given row to store the output of the projection. */
- def target(row: MutableRow): MutableProjection
+ def target(row: InternalRow): MutableProjection
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 73dceb35ac..751b821e1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -158,33 +158,6 @@ trait BaseGenericInternalRow extends InternalRow {
}
/**
- * An extended interface to [[InternalRow]] that allows the values for each column to be updated.
- * Setting a value through a primitive function implicitly marks that column as not null.
- */
-abstract class MutableRow extends InternalRow {
- def setNullAt(i: Int): Unit
-
- def update(i: Int, value: Any): Unit
-
- // default implementation (slow)
- def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }
- def setByte(i: Int, value: Byte): Unit = { update(i, value) }
- def setShort(i: Int, value: Short): Unit = { update(i, value) }
- def setInt(i: Int, value: Int): Unit = { update(i, value) }
- def setLong(i: Int, value: Long): Unit = { update(i, value) }
- def setFloat(i: Int, value: Float): Unit = { update(i, value) }
- def setDouble(i: Int, value: Double): Unit = { update(i, value) }
-
- /**
- * Update the decimal column at `i`.
- *
- * Note: In order to support update decimal with precision > 18 in UnsafeRow,
- * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
- */
- def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
-}
-
-/**
* A row implementation that uses an array of objects as the underlying storage. Note that, while
* the array is not copied, and thus could technically be mutated after creation, this is not
* allowed.
@@ -230,24 +203,9 @@ class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow
override def numFields: Int = values.length
- override def copy(): GenericInternalRow = this
-}
-
-class GenericMutableRow(values: Array[Any]) extends MutableRow with BaseGenericInternalRow {
- /** No-arg constructor for serialization. */
- protected def this() = this(null)
-
- def this(size: Int) = this(new Array[Any](size))
-
- override protected def genericGet(ordinal: Int) = values(ordinal)
-
- override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values
-
- override def numFields: Int = values.length
-
override def setNullAt(i: Int): Unit = { values(i) = null}
override def update(i: Int, value: Any): Unit = { values(i) = value }
- override def copy(): InternalRow = new GenericInternalRow(values.clone())
+ override def copy(): GenericInternalRow = this
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index f80e6373d2..e476cb11a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -105,7 +105,7 @@ class JacksonParser(
}
emptyRow
} else {
- val row = new GenericMutableRow(schema.length)
+ val row = new GenericInternalRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
require(schema(corruptIndex).dataType == StringType)
row.update(corruptIndex, UTF8String.fromString(record))
@@ -363,7 +363,7 @@ class JacksonParser(
parser: JsonParser,
schema: StructType,
fieldConverters: Seq[ValueConverter]): InternalRow = {
- val row = new GenericMutableRow(schema.length)
+ val row = new GenericInternalRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 85563ddedc..43b6afd9ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.reflect.runtime.universe.typeOf
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificInternalRow}
import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -94,7 +94,7 @@ object TestingUDT {
.add("c", DoubleType, nullable = false)
override def serialize(n: NestedStruct): Any = {
- val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+ val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
row.setInt(0, n.a)
row.setLong(1, n.b)
row.setDouble(2, n.c)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 5588b44291..0cb201e4da 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -68,7 +68,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val length = 5000
val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(true)
if (!checkResult(actual, expected)) {
@@ -91,7 +91,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val expression = CaseWhen((1 to cases).map(generateCase(_)))
val plan = GenerateMutableProjection.generate(Seq(expression))
- val input = new GenericMutableRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
+ val input = new GenericInternalRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
val actual = plan(input).toSeq(Seq(expression.dataType))
assert(actual(0) == cases)
@@ -101,7 +101,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val length = 5000
val expressions = Seq(CreateArray(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(new GenericArrayData(Seq.fill(length)(true)))
if (!checkResult(actual, expected)) {
@@ -116,7 +116,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
case (expr, i) => Seq(Literal(i), expr)
}))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType)).map {
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)).map {
case m: ArrayBasedMapData => ArrayBasedMapData.toScalaMap(m)
}
val expected = (0 until length).map((_, true)).toMap :: Nil
@@ -130,7 +130,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val length = 5000
val expressions = Seq(CreateStruct(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
if (!checkResult(actual, expected)) {
@@ -145,7 +145,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
expr => Seq(Literal(expr.toString), expr)
}))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
if (!checkResult(actual, expected)) {
@@ -158,7 +158,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val schema = StructType(Seq.fill(length)(StructField("int", IntegerType)))
val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(Row.fromSeq(Seq.fill(length)(1)))
if (!checkResult(actual, expected)) {
@@ -174,7 +174,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
index 0f1264c7c3..25a675a902 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
@@ -45,7 +45,7 @@ class MapDataSuite extends SparkFunSuite {
// UnsafeMapData
val unsafeConverter = UnsafeProjection.create(Array[DataType](MapType(StringType, IntegerType)))
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
def toUnsafeMap(map: ArrayBasedMapData): UnsafeMapData = {
row.update(0, map)
val unsafeRow = unsafeConverter.apply(row)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index 90790dda75..cf3cbe2707 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -37,7 +37,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val fieldTypes: Array[DataType] = Array(LongType, LongType, IntegerType)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new SpecificMutableRow(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
row.setLong(0, 0)
row.setLong(1, 1)
row.setInt(2, 2)
@@ -75,7 +75,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val fieldTypes: Array[DataType] = Array(LongType, StringType, BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new SpecificMutableRow(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
row.setLong(0, 0)
row.update(1, UTF8String.fromString("Hello"))
row.update(2, "World".getBytes(StandardCharsets.UTF_8))
@@ -94,7 +94,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val fieldTypes: Array[DataType] = Array(LongType, StringType, DateType, TimestampType)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new SpecificMutableRow(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
row.setLong(0, 0)
row.update(1, UTF8String.fromString("Hello"))
row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
@@ -138,7 +138,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val converter = UnsafeProjection.create(fieldTypes)
val rowWithAllNullColumns: InternalRow = {
- val r = new SpecificMutableRow(fieldTypes)
+ val r = new SpecificInternalRow(fieldTypes)
for (i <- fieldTypes.indices) {
r.setNullAt(i)
}
@@ -167,7 +167,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
// columns, then the serialized row representation should be identical to what we would get by
// creating an entirely null row via the converter
val rowWithNoNullColumns: InternalRow = {
- val r = new SpecificMutableRow(fieldTypes)
+ val r = new SpecificInternalRow(fieldTypes)
r.setNullAt(0)
r.setBoolean(1, false)
r.setByte(2, 20)
@@ -243,11 +243,11 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
test("NaN canonicalization") {
val fieldTypes: Array[DataType] = Array(FloatType, DoubleType)
- val row1 = new SpecificMutableRow(fieldTypes)
+ val row1 = new SpecificInternalRow(fieldTypes)
row1.setFloat(0, java.lang.Float.intBitsToFloat(0x7f800001))
row1.setDouble(1, java.lang.Double.longBitsToDouble(0x7ff0000000000001L))
- val row2 = new SpecificMutableRow(fieldTypes)
+ val row2 = new SpecificInternalRow(fieldTypes)
row2.setFloat(0, java.lang.Float.intBitsToFloat(0x7fffffff))
row2.setDouble(1, java.lang.Double.longBitsToDouble(0x7fffffffffffffffL))
@@ -263,7 +263,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, InternalRow(1))
row.update(1, InternalRow(InternalRow(2L)))
@@ -324,7 +324,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, createArray(1, 2))
row.update(1, createArray(createArray(3, 4)))
@@ -359,7 +359,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val innerMap = createMap(5, 6)(7, 8)
val map2 = createMap(9)(innerMap)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, map1)
row.update(1, map2)
@@ -400,7 +400,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, InternalRow(createArray(1)))
row.update(1, createArray(InternalRow(2L)))
@@ -439,7 +439,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, InternalRow(createMap(1)(2)))
row.update(1, createMap(3)(InternalRow(4L)))
@@ -485,7 +485,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, createArray(createMap(1)(2)))
row.update(1, createMap(3)(createArray(4)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
index 61298a1b72..8456e24460 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedAttribu
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericMutableRow, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest, PercentileDigestSerializer}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -144,7 +144,8 @@ class ApproximatePercentileSuite extends SparkFunSuite {
.withNewInputAggBufferOffset(inputAggregationBufferOffset)
.withNewMutableAggBufferOffset(mutableAggregationBufferOffset)
- val mutableAggBuffer = new GenericMutableRow(new Array[Any](mutableAggregationBufferOffset + 1))
+ val mutableAggBuffer = new GenericInternalRow(
+ new Array[Any](mutableAggregationBufferOffset + 1))
agg.initialize(mutableAggBuffer)
val dataCount = 10
(1 to dataCount).foreach { data =>
@@ -154,7 +155,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
// Serialize the aggregation buffer
val serialized = mutableAggBuffer.getBinary(mutableAggregationBufferOffset)
- val inputAggBuffer = new GenericMutableRow(Array[Any](null, serialized))
+ val inputAggBuffer = new GenericInternalRow(Array[Any](null, serialized))
// Phase 2: final mode aggregation
// Re-initialize the aggregation buffer
@@ -311,7 +312,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
test("class ApproximatePercentile, null handling") {
val childExpression = Cast(BoundReference(0, IntegerType, nullable = true), DoubleType)
val agg = new ApproximatePercentile(childExpression, Literal(0.5D))
- val buffer = new GenericMutableRow(new Array[Any](1))
+ val buffer = new GenericInternalRow(new Array[Any](1))
agg.initialize(buffer)
// Empty aggregation buffer
assert(agg.eval(buffer) == null)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
index f5374229ca..17f6b71bb2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
@@ -22,28 +22,29 @@ import java.util.Random
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, SpecificInternalRow}
import org.apache.spark.sql.types.{DataType, IntegerType}
class HyperLogLogPlusPlusSuite extends SparkFunSuite {
/** Create a HLL++ instance and an input and output buffer. */
def createEstimator(rsd: Double, dt: DataType = IntegerType):
- (HyperLogLogPlusPlus, MutableRow, MutableRow) = {
- val input = new SpecificMutableRow(Seq(dt))
+ (HyperLogLogPlusPlus, InternalRow, InternalRow) = {
+ val input = new SpecificInternalRow(Seq(dt))
val hll = new HyperLogLogPlusPlus(new BoundReference(0, dt, true), rsd)
val buffer = createBuffer(hll)
(hll, input, buffer)
}
- def createBuffer(hll: HyperLogLogPlusPlus): MutableRow = {
- val buffer = new SpecificMutableRow(hll.aggBufferAttributes.map(_.dataType))
+ def createBuffer(hll: HyperLogLogPlusPlus): InternalRow = {
+ val buffer = new SpecificInternalRow(hll.aggBufferAttributes.map(_.dataType))
hll.initialize(buffer)
buffer
}
/** Evaluate the estimate. It should be within 3*SD's of the given true rsd. */
- def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: MutableRow, cardinality: Int): Unit = {
+ def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: InternalRow, cardinality: Int): Unit = {
val estimate = hll.eval(buffer).asInstanceOf[Long].toDouble
val error = math.abs((estimate / cardinality.toDouble) - 1.0d)
assert(error < hll.trueRsd * 3.0d, "Error should be within 3 std. errors.")
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 62abc2a821..a6ce4c2edc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -21,8 +21,7 @@ import java.util.*;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
-import org.apache.spark.sql.catalyst.expressions.MutableRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
@@ -91,7 +90,7 @@ public final class ColumnarBatch {
* Adapter class to interop with existing components that expect internal row. A lot of
* performance is lost with this translation.
*/
- public static final class Row extends MutableRow {
+ public static final class Row extends InternalRow {
protected int rowId;
private final ColumnarBatch parent;
private final int fixedLenRowSize;
@@ -129,7 +128,7 @@ public final class ColumnarBatch {
* Revisit this. This is expensive. This is currently only used in test paths.
*/
public InternalRow copy() {
- GenericMutableRow row = new GenericMutableRow(columns.length);
+ GenericInternalRow row = new GenericInternalRow(columns.length);
for (int i = 0; i < numFields(); i++) {
if (isNullAt(i)) {
row.setNullAt(i);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 6c4248c60e..d3a2222862 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -32,7 +32,7 @@ object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
- val mutableRow = new GenericMutableRow(numColumns)
+ val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
@@ -52,7 +52,7 @@ object RDDConversions {
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
- val mutableRow = new GenericMutableRow(numColumns)
+ val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index f335912ba2..7c11fdb979 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -153,7 +153,7 @@ abstract class AggregationIterator(
protected def generateProcessRow(
expressions: Seq[AggregateExpression],
functions: Seq[AggregateFunction],
- inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
+ inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit = {
val joinedRow = new JoinedRow
if (expressions.nonEmpty) {
val mergeExpressions = functions.zipWithIndex.flatMap {
@@ -168,9 +168,9 @@ abstract class AggregationIterator(
case (ae: ImperativeAggregate, i) =>
expressions(i).mode match {
case Partial | Complete =>
- (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
+ (buffer: InternalRow, row: InternalRow) => ae.update(buffer, row)
case PartialMerge | Final =>
- (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
+ (buffer: InternalRow, row: InternalRow) => ae.merge(buffer, row)
}
}.toArray
// This projection is used to merge buffer values for all expression-based aggregates.
@@ -178,7 +178,7 @@ abstract class AggregationIterator(
val updateProjection =
newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)
- (currentBuffer: MutableRow, row: InternalRow) => {
+ (currentBuffer: InternalRow, row: InternalRow) => {
// Process all expression-based aggregate functions.
updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
// Process all imperative aggregate functions.
@@ -190,11 +190,11 @@ abstract class AggregationIterator(
}
} else {
// Grouping only.
- (currentBuffer: MutableRow, row: InternalRow) => {}
+ (currentBuffer: InternalRow, row: InternalRow) => {}
}
}
- protected val processRow: (MutableRow, InternalRow) => Unit =
+ protected val processRow: (InternalRow, InternalRow) => Unit =
generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
protected val groupingProjection: UnsafeProjection =
@@ -202,7 +202,7 @@ abstract class AggregationIterator(
protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
// Initializing the function used to generate the output row.
- protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+ protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
val joinedRow = new JoinedRow
val modes = aggregateExpressions.map(_.mode).distinct
val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
@@ -211,14 +211,14 @@ abstract class AggregationIterator(
case ae: DeclarativeAggregate => ae.evaluateExpression
case agg: AggregateFunction => NoOp
}
- val aggregateResult = new SpecificMutableRow(aggregateAttributes.map(_.dataType))
+ val aggregateResult = new SpecificInternalRow(aggregateAttributes.map(_.dataType))
val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)
expressionAggEvalProjection.target(aggregateResult)
val resultProjection =
UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateAttributes)
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
// Generate results for all expression-based aggregate functions.
expressionAggEvalProjection(currentBuffer)
// Generate results for all imperative aggregate functions.
@@ -244,7 +244,7 @@ abstract class AggregationIterator(
}
}
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
// Serializes the generic object stored in aggregation buffer
var i = 0
while (i < typedImperativeAggregates.length) {
@@ -256,17 +256,17 @@ abstract class AggregationIterator(
} else {
// Grouping-only: we only output values based on grouping expressions.
val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
resultProjection(currentGroupingKey)
}
}
}
- protected val generateOutput: (UnsafeRow, MutableRow) => UnsafeRow =
+ protected val generateOutput: (UnsafeRow, InternalRow) => UnsafeRow =
generateResultProjection()
/** Initializes buffer values for all aggregate functions. */
- protected def initializeBuffer(buffer: MutableRow): Unit = {
+ protected def initializeBuffer(buffer: InternalRow): Unit = {
expressionAggInitialProjection.target(buffer)(EmptyRow)
var i = 0
while (i < allImperativeAggregateFunctions.length) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index c2b1ef0fe3..bea2dce1a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -49,11 +49,11 @@ class SortBasedAggregationIterator(
* Creates a new aggregation buffer and initializes buffer values
* for all aggregate functions.
*/
- private def newBuffer: MutableRow = {
+ private def newBuffer: InternalRow = {
val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val bufferRowSize: Int = bufferSchema.length
- val genericMutableBuffer = new GenericMutableRow(bufferRowSize)
+ val genericMutableBuffer = new GenericInternalRow(bufferRowSize)
val useUnsafeBuffer = bufferSchema.map(_.dataType).forall(UnsafeRow.isMutable)
val buffer = if (useUnsafeBuffer) {
@@ -84,7 +84,7 @@ class SortBasedAggregationIterator(
private[this] var sortedInputHasNewGroup: Boolean = false
// The aggregation buffer used by the sort-based aggregation.
- private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
+ private[this] val sortBasedAggregationBuffer: InternalRow = newBuffer
// This safe projection is used to turn the input row into safe row. This is necessary
// because the input row may be produced by unsafe projection in child operator and all the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 4e072a92cc..2988161ee5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -118,7 +118,7 @@ class TungstenAggregationIterator(
private def createNewAggregationBuffer(): UnsafeRow = {
val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
- .apply(new GenericMutableRow(bufferSchema.length))
+ .apply(new GenericInternalRow(bufferSchema.length))
// Initialize declarative aggregates' buffer values
expressionAggInitialProjection.target(buffer)(EmptyRow)
// Initialize imperative aggregates' buffer values
@@ -127,7 +127,7 @@ class TungstenAggregationIterator(
}
// Creates a function used to generate output rows.
- override protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+ override protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
val modes = aggregateExpressions.map(_.mode).distinct
if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) {
// Fast path for partial aggregation, UnsafeRowJoiner is usually faster than projection
@@ -137,7 +137,7 @@ class TungstenAggregationIterator(
val bufferSchema = StructType.fromAttributes(bufferAttributes)
val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
unsafeRowJoiner.join(currentGroupingKey, currentBuffer.asInstanceOf[UnsafeRow])
}
} else {
@@ -300,7 +300,7 @@ class TungstenAggregationIterator(
private[this] val sortBasedAggregationBuffer: UnsafeRow = createNewAggregationBuffer()
// The function used to process rows in a group
- private[this] var sortBasedProcessRow: (MutableRow, InternalRow) => Unit = null
+ private[this] var sortBasedProcessRow: (InternalRow, InternalRow) => Unit = null
// Processes rows in the current group. It will stop when it find a new group.
private def processCurrentSortedGroup(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 586e1456ac..67760f334e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow, _}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, _}
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
@@ -96,18 +96,18 @@ sealed trait BufferSetterGetterUtils {
getters
}
- def createSetters(schema: StructType): Array[((MutableRow, Int, Any) => Unit)] = {
+ def createSetters(schema: StructType): Array[((InternalRow, Int, Any) => Unit)] = {
val dataTypes = schema.fields.map(_.dataType)
- val setters = new Array[(MutableRow, Int, Any) => Unit](dataTypes.length)
+ val setters = new Array[(InternalRow, Int, Any) => Unit](dataTypes.length)
var i = 0
while (i < setters.length) {
setters(i) = dataTypes(i) match {
case NullType =>
- (row: MutableRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
+ (row: InternalRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
case b: BooleanType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setBoolean(ordinal, value.asInstanceOf[Boolean])
} else {
@@ -115,7 +115,7 @@ sealed trait BufferSetterGetterUtils {
}
case ByteType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setByte(ordinal, value.asInstanceOf[Byte])
} else {
@@ -123,7 +123,7 @@ sealed trait BufferSetterGetterUtils {
}
case ShortType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setShort(ordinal, value.asInstanceOf[Short])
} else {
@@ -131,7 +131,7 @@ sealed trait BufferSetterGetterUtils {
}
case IntegerType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setInt(ordinal, value.asInstanceOf[Int])
} else {
@@ -139,7 +139,7 @@ sealed trait BufferSetterGetterUtils {
}
case LongType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setLong(ordinal, value.asInstanceOf[Long])
} else {
@@ -147,7 +147,7 @@ sealed trait BufferSetterGetterUtils {
}
case FloatType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setFloat(ordinal, value.asInstanceOf[Float])
} else {
@@ -155,7 +155,7 @@ sealed trait BufferSetterGetterUtils {
}
case DoubleType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setDouble(ordinal, value.asInstanceOf[Double])
} else {
@@ -164,13 +164,13 @@ sealed trait BufferSetterGetterUtils {
case dt: DecimalType =>
val precision = dt.precision
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
// To make it work with UnsafeRow, we cannot use setNullAt.
// Please see the comment of UnsafeRow's setDecimal.
row.setDecimal(ordinal, value.asInstanceOf[Decimal], precision)
case DateType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setInt(ordinal, value.asInstanceOf[Int])
} else {
@@ -178,7 +178,7 @@ sealed trait BufferSetterGetterUtils {
}
case TimestampType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setLong(ordinal, value.asInstanceOf[Long])
} else {
@@ -186,7 +186,7 @@ sealed trait BufferSetterGetterUtils {
}
case other =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.update(ordinal, value)
} else {
@@ -209,7 +209,7 @@ private[aggregate] class MutableAggregationBufferImpl(
toCatalystConverters: Array[Any => Any],
toScalaConverters: Array[Any => Any],
bufferOffset: Int,
- var underlyingBuffer: MutableRow)
+ var underlyingBuffer: InternalRow)
extends MutableAggregationBuffer with BufferSetterGetterUtils {
private[this] val offsets: Array[Int] = {
@@ -413,13 +413,13 @@ case class ScalaUDAF(
null)
}
- override def initialize(buffer: MutableRow): Unit = {
+ override def initialize(buffer: InternalRow): Unit = {
mutableAggregateBuffer.underlyingBuffer = buffer
udaf.initialize(mutableAggregateBuffer)
}
- override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ override def update(buffer: InternalRow, input: InternalRow): Unit = {
mutableAggregateBuffer.underlyingBuffer = buffer
udaf.update(
@@ -427,7 +427,7 @@ case class ScalaUDAF(
inputToScalaConverters(inputProjection(input)).asInstanceOf[Row])
}
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
mutableAggregateBuffer.underlyingBuffer = buffer1
inputAggregateBuffer.underlyingInputBuffer = buffer2
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index 7cde04b626..6241b79d9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -21,15 +21,16 @@ import java.nio.{ByteBuffer, ByteOrder}
import scala.annotation.tailrec
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow}
import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.types._
/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
* extracted from the buffer, instead of directly returning it, the value is set into some field of
- * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
- * for primitive values provided by [[MutableRow]].
+ * a [[InternalRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
+ * for primitive values provided by [[InternalRow]].
*/
private[columnar] trait ColumnAccessor {
initialize()
@@ -38,7 +39,7 @@ private[columnar] trait ColumnAccessor {
def hasNext: Boolean
- def extractTo(row: MutableRow, ordinal: Int): Unit
+ def extractTo(row: InternalRow, ordinal: Int): Unit
protected def underlyingBuffer: ByteBuffer
}
@@ -52,11 +53,11 @@ private[columnar] abstract class BasicColumnAccessor[JvmType](
override def hasNext: Boolean = buffer.hasRemaining
- override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+ override def extractTo(row: InternalRow, ordinal: Int): Unit = {
extractSingle(row, ordinal)
}
- def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+ def extractSingle(row: InternalRow, ordinal: Int): Unit = {
columnType.extract(buffer, row, ordinal)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index d27d8c362d..703bde2531 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -92,7 +92,7 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
* `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
* possible.
*/
- def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
setField(row, ordinal, extract(buffer))
}
@@ -125,13 +125,13 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
* Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing
* costs whenever possible.
*/
- def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit
+ def setField(row: InternalRow, ordinal: Int, value: JvmType): Unit
/**
* Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
* boxing/unboxing costs whenever possible.
*/
- def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int): Unit = {
setField(to, toOrdinal, getField(from, fromOrdinal))
}
@@ -149,7 +149,7 @@ private[columnar] object NULL extends ColumnType[Any] {
override def defaultSize: Int = 0
override def append(v: Any, buffer: ByteBuffer): Unit = {}
override def extract(buffer: ByteBuffer): Any = null
- override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
+ override def setField(row: InternalRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
override def getField(row: InternalRow, ordinal: Int): Any = null
}
@@ -177,18 +177,18 @@ private[columnar] object INT extends NativeColumnType(IntegerType, 4) {
ByteBufferHelper.getInt(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setInt(ordinal, ByteBufferHelper.getInt(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Int): Unit = {
row.setInt(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setInt(toOrdinal, from.getInt(fromOrdinal))
}
}
@@ -206,17 +206,17 @@ private[columnar] object LONG extends NativeColumnType(LongType, 8) {
ByteBufferHelper.getLong(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Long): Unit = {
row.setLong(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setLong(toOrdinal, from.getLong(fromOrdinal))
}
}
@@ -234,17 +234,17 @@ private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) {
ByteBufferHelper.getFloat(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Float): Unit = {
row.setFloat(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
}
}
@@ -262,17 +262,17 @@ private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) {
ByteBufferHelper.getDouble(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Double): Unit = {
row.setDouble(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
}
}
@@ -288,17 +288,17 @@ private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setBoolean(ordinal, buffer.get() == 1)
}
- override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Boolean): Unit = {
row.setBoolean(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
}
}
@@ -316,17 +316,17 @@ private[columnar] object BYTE extends NativeColumnType(ByteType, 1) {
buffer.get()
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setByte(ordinal, buffer.get())
}
- override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Byte): Unit = {
row.setByte(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setByte(toOrdinal, from.getByte(fromOrdinal))
}
}
@@ -344,17 +344,17 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
buffer.getShort()
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setShort(ordinal, buffer.getShort())
}
- override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Short): Unit = {
row.setShort(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setShort(toOrdinal, from.getShort(fromOrdinal))
}
}
@@ -366,7 +366,7 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] {
// copy the bytes from ByteBuffer to UnsafeRow
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
if (row.isInstanceOf[MutableUnsafeRow]) {
val numBytes = buffer.getInt
val cursor = buffer.position()
@@ -407,7 +407,7 @@ private[columnar] object STRING
UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length)
}
- override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UTF8String): Unit = {
if (row.isInstanceOf[MutableUnsafeRow]) {
row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value)
} else {
@@ -419,7 +419,7 @@ private[columnar] object STRING
row.getUTF8String(ordinal)
}
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
setField(to, toOrdinal, getField(from, fromOrdinal))
}
@@ -433,7 +433,7 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
Decimal(ByteBufferHelper.getLong(buffer), precision, scale)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
if (row.isInstanceOf[MutableUnsafeRow]) {
// copy it as Long
row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
@@ -459,11 +459,11 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
row.getDecimal(ordinal, precision, scale)
}
- override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
row.setDecimal(ordinal, value, precision)
}
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
setField(to, toOrdinal, getField(from, fromOrdinal))
}
}
@@ -497,7 +497,7 @@ private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) {
def dataType: DataType = BinaryType
- override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Array[Byte]): Unit = {
row.update(ordinal, value)
}
@@ -522,7 +522,7 @@ private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int)
row.getDecimal(ordinal, precision, scale)
}
- override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
row.setDecimal(ordinal, value, precision)
}
@@ -553,7 +553,7 @@ private[columnar] case class STRUCT(dataType: StructType)
override def defaultSize: Int = 20
- override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UnsafeRow): Unit = {
row.update(ordinal, value)
}
@@ -591,7 +591,7 @@ private[columnar] case class ARRAY(dataType: ArrayType)
override def defaultSize: Int = 28
- override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UnsafeArrayData): Unit = {
row.update(ordinal, value)
}
@@ -630,7 +630,7 @@ private[columnar] case class MAP(dataType: MapType)
override def defaultSize: Int = 68
- override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UnsafeMapData): Unit = {
row.update(ordinal, value)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 96bd338f09..14024d6c10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -36,8 +36,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
*
* WARNING: These setter MUST be called in increasing order of ordinals.
*/
-class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
-
+class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalRow {
override def isNullAt(i: Int): Boolean = writer.isNullAt(i)
override def setNullAt(i: Int): Unit = writer.setNullAt(i)
@@ -55,6 +54,9 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(nu
override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException
// all other methods inherited from GenericMutableRow are not need
+ override protected def genericGet(ordinal: Int): Any = throw new UnsupportedOperationException
+ override def numFields: Int = throw new UnsupportedOperationException
+ override def copy(): InternalRow = throw new UnsupportedOperationException
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
index 2465633162..2f09757aa3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import java.nio.{ByteBuffer, ByteOrder}
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
private var nullsBuffer: ByteBuffer = _
@@ -39,7 +39,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
super.initialize()
}
- abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+ abstract override def extractTo(row: InternalRow, ordinal: Int): Unit = {
if (pos == nextNullIndex) {
seenNulls += 1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
index 6579b5068e..e1d13ad0e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.columnar.compression
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor}
import org.apache.spark.sql.types.AtomicType
@@ -33,7 +33,7 @@ private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends Colu
abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
- override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+ override def extractSingle(row: InternalRow, ordinal: Int): Unit = {
decoder.next(row, ordinal)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
index b90d00b15b..6e4f1c5b80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.columnar.compression
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType}
import org.apache.spark.sql.types.AtomicType
@@ -39,7 +38,7 @@ private[columnar] trait Encoder[T <: AtomicType] {
}
private[columnar] trait Decoder[T <: AtomicType] {
- def next(row: MutableRow, ordinal: Int): Unit
+ def next(row: InternalRow, ordinal: Int): Unit
def hasNext: Boolean
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 941f03b745..ee99c90a75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.types._
@@ -56,7 +56,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
extends compression.Decoder[T] {
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
columnType.extract(buffer, row, ordinal)
}
@@ -86,7 +86,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
private var _compressedSize = 0
// Using `MutableRow` to store the last value to avoid boxing/unboxing cost.
- private val lastValue = new SpecificMutableRow(Seq(columnType.dataType))
+ private val lastValue = new SpecificInternalRow(Seq(columnType.dataType))
private var lastRun = 0
override def uncompressedSize: Int = _uncompressedSize
@@ -117,9 +117,9 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
to.putInt(RunLengthEncoding.typeId)
if (from.hasRemaining) {
- val currentValue = new SpecificMutableRow(Seq(columnType.dataType))
+ val currentValue = new SpecificInternalRow(Seq(columnType.dataType))
var currentRun = 1
- val value = new SpecificMutableRow(Seq(columnType.dataType))
+ val value = new SpecificInternalRow(Seq(columnType.dataType))
columnType.extract(from, currentValue, 0)
@@ -156,7 +156,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
private var valueCount = 0
private var currentValue: T#InternalType = _
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
if (valueCount == run) {
currentValue = columnType.extract(buffer)
run = ByteBufferHelper.getInt(buffer)
@@ -273,7 +273,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme {
Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
}
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType])
}
@@ -356,7 +356,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
private var visited: Int = 0
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
val bit = visited % BITS_PER_LONG
visited += 1
@@ -443,7 +443,7 @@ private[columnar] case object IntDelta extends CompressionScheme {
override def hasNext: Boolean = buffer.hasRemaining
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
val delta = buffer.get()
prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer)
row.setInt(ordinal, prev)
@@ -523,7 +523,7 @@ private[columnar] case object LongDelta extends CompressionScheme {
override def hasNext: Boolean = buffer.hasRemaining
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
val delta = buffer.get()
prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer)
row.setLong(ordinal, prev)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 693b4c4d0e..6f9ed50a02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -273,7 +273,7 @@ object DataSourceStrategy extends Strategy with Logging {
// Get the bucket ID based on the bucketing values.
// Restriction: Bucket pruning works iff the bucketing column has one and only one column.
def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = {
- val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType))
+ val mutableRow = new SpecificInternalRow(Seq(bucketColumn.dataType))
mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null)
val bucketIdGeneration = UnsafeProjection.create(
HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 33b170bc31..55cb26d651 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
import org.apache.spark.sql.types._
@@ -88,7 +88,7 @@ object CSVRelation extends Logging {
case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field)) = index
}
val requiredSize = requiredFields.length
- val row = new GenericMutableRow(requiredSize)
+ val row = new GenericInternalRow(requiredSize)
(tokens: Array[String], numMalformedRows) => {
if (params.dropMalformed && schemaFields.length != tokens.length) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 66f2bada2e..47549637b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
@@ -283,7 +283,7 @@ object JdbcUtils extends Logging {
new NextIterator[InternalRow] {
private[this] val rs = resultSet
private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
- private[this] val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType))
+ private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
override protected def close(): Unit = {
try {
@@ -314,22 +314,22 @@ object JdbcUtils extends Logging {
// A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
// for `MutableRow`. The last argument `Int` means the index for the value to be set in
// the row and also used for the value in `ResultSet`.
- private type JDBCValueGetter = (ResultSet, MutableRow, Int) => Unit
+ private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
/**
* Creates `JDBCValueGetter`s according to [[StructType]], which can set
- * each value from `ResultSet` to each field of [[MutableRow]] correctly.
+ * each value from `ResultSet` to each field of [[InternalRow]] correctly.
*/
private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
case BooleanType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setBoolean(pos, rs.getBoolean(pos + 1))
case DateType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos + 1)
if (dateVal != null) {
@@ -347,25 +347,25 @@ object JdbcUtils extends Logging {
// retrieve it, you will get wrong result 199.99.
// So it is needed to set precision and scale for Decimal based on JDBC metadata.
case DecimalType.Fixed(p, s) =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val decimal =
nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
row.update(pos, decimal)
case DoubleType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setDouble(pos, rs.getDouble(pos + 1))
case FloatType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setFloat(pos, rs.getFloat(pos + 1))
case IntegerType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setInt(pos, rs.getInt(pos + 1))
case LongType if metadata.contains("binarylong") =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val bytes = rs.getBytes(pos + 1)
var ans = 0L
var j = 0
@@ -376,20 +376,20 @@ object JdbcUtils extends Logging {
row.setLong(pos, ans)
case LongType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setLong(pos, rs.getLong(pos + 1))
case ShortType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setShort(pos, rs.getShort(pos + 1))
case StringType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
case TimestampType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
@@ -398,7 +398,7 @@ object JdbcUtils extends Logging {
}
case BinaryType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
case ArrayType(et, _) =>
@@ -437,7 +437,7 @@ object JdbcUtils extends Logging {
case _ => (array: Object) => array.asInstanceOf[Array[Any]]
}
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[Object](
rs.getArray(pos + 1).getArray,
array => new GenericArrayData(elementConversion.apply(array)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 9ffc2b5dd8..33dcf2f3fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
* corresponding parent container. For example, a converter for a `StructType` field may set
- * converted values to a [[MutableRow]]; or a converter for array elements may append converted
+ * converted values to a [[InternalRow]]; or a converter for array elements may append converted
* values to an [[ArrayBuffer]].
*/
private[parquet] trait ParentContainerUpdater {
@@ -155,7 +155,7 @@ private[parquet] class ParquetRowConverter(
* Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
* converted filed values to the `ordinal`-th cell in `currentRow`.
*/
- private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
+ private final class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater {
override def set(value: Any): Unit = row(ordinal) = value
override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
@@ -166,7 +166,7 @@ private[parquet] class ParquetRowConverter(
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}
- private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+ private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
private val unsafeProjection = UnsafeProjection.create(catalystType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 43cdce7de8..bfe7e3dea4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -119,7 +119,7 @@ case class BroadcastNestedLoopJoinExec(
streamed.execute().mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
- val nulls = new GenericMutableRow(broadcast.output.size)
+ val nulls = new GenericInternalRow(broadcast.output.size)
// Returns an iterator to avoid copy the rows.
new Iterator[InternalRow] {
@@ -205,14 +205,14 @@ case class BroadcastNestedLoopJoinExec(
val joinedRow = new JoinedRow
if (condition.isDefined) {
- val resultRow = new GenericMutableRow(Array[Any](null))
+ val resultRow = new GenericInternalRow(Array[Any](null))
streamedIter.map { row =>
val result = buildRows.exists(r => boundCondition(joinedRow(row, r)))
resultRow.setBoolean(0, result)
joinedRow(row, resultRow)
}
} else {
- val resultRow = new GenericMutableRow(Array[Any](buildRows.nonEmpty))
+ val resultRow = new GenericInternalRow(Array[Any](buildRows.nonEmpty))
streamedIter.map { row =>
joinedRow(row, resultRow)
}
@@ -293,7 +293,7 @@ case class BroadcastNestedLoopJoinExec(
}
val notMatchedBroadcastRows: Seq[InternalRow] = {
- val nulls = new GenericMutableRow(streamed.output.size)
+ val nulls = new GenericInternalRow(streamed.output.size)
val buf: CompactBuffer[InternalRow] = new CompactBuffer()
val joinedRow = new JoinedRow
joinedRow.withLeft(nulls)
@@ -311,7 +311,7 @@ case class BroadcastNestedLoopJoinExec(
val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
- val nulls = new GenericMutableRow(broadcast.output.size)
+ val nulls = new GenericInternalRow(broadcast.output.size)
streamedIter.flatMap { streamedRow =>
var i = 0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb6bfa7b27..8ddac19bf1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -192,7 +192,7 @@ trait HashJoin {
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation): Iterator[InternalRow] = {
val joinKeys = streamSideKeyGenerator()
- val result = new GenericMutableRow(Array[Any](null))
+ val result = new GenericInternalRow(Array[Any](null))
val joinedRow = new JoinedRow
streamIter.map { current =>
val key = joinKeys(current)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 81b3e1d224..ecf7cf289f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -275,7 +275,7 @@ case class SortMergeJoinExec(
case j: ExistenceJoin =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
- private[this] val result: MutableRow = new GenericMutableRow(Array[Any](null))
+ private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null))
private[this] val smjScanner = new SortMergeJoinScanner(
createLeftKeyGenerator(),
createRightKeyGenerator(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index c7e267152b..2acc5110e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -141,7 +141,7 @@ object ObjectOperator {
def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = {
val proj = GenerateUnsafeProjection.generate(serializer)
val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head
- val objRow = new SpecificMutableRow(objType :: Nil)
+ val objRow = new SpecificInternalRow(objType :: Nil)
(o: Any) => {
objRow(0) = o
proj(objRow)
@@ -149,7 +149,7 @@ object ObjectOperator {
}
def wrapObjectToRow(objType: DataType): Any => InternalRow = {
- val outputRow = new SpecificMutableRow(objType :: Nil)
+ val outputRow = new SpecificInternalRow(objType :: Nil)
(o: Any) => {
outputRow(0) = o
outputRow
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index f9d20ad090..dcaf2c76d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -147,7 +147,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
.compute(inputIterator, context.partitionId(), context)
val unpickle = new Unpickler
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
val joined = new JoinedRow
val resultType = if (udfs.length == 1) {
udfs.head.dataType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 822f49ecab..c02b154987 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.stat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
-import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.QuantileSummaries
import org.apache.spark.sql.functions._
@@ -186,7 +186,7 @@ object StatFunctions extends Logging {
require(columnSize < 1e4, s"The number of distinct values for $col2, can't " +
s"exceed 1e4. Currently $columnSize")
val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
- val countsRow = new GenericMutableRow(columnSize + 1)
+ val countsRow = new GenericInternalRow(columnSize + 1)
rows.foreach { (row: Row) =>
// row.get(0) is column 1
// row.get(1) is column 2
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index d3a46d020d..c9f5d3b3d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -123,7 +123,7 @@ private[window] final class AggregateProcessor(
private[this] val join = new JoinedRow
private[this] val numImperatives = imperatives.length
- private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
+ private[this] val buffer = new SpecificInternalRow(bufferSchema.toSeq.map(_.dataType))
initialProjection.target(buffer)
updateProjection.target(buffer)
@@ -154,6 +154,6 @@ private[window] final class AggregateProcessor(
}
/** Evaluate buffer. */
- def evaluate(target: MutableRow): Unit =
+ def evaluate(target: InternalRow): Unit =
evaluateProjection.target(target)(buffer)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 7a6a30f120..1dd281ebf1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -204,7 +204,7 @@ case class WindowExec(
val factory = key match {
// Offset Frame
case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
- target: MutableRow =>
+ target: InternalRow =>
new OffsetWindowFunctionFrame(
target,
ordinal,
@@ -217,7 +217,7 @@ case class WindowExec(
// Growing Frame.
case ("AGGREGATE", frameType, None, Some(high)) =>
- target: MutableRow => {
+ target: InternalRow => {
new UnboundedPrecedingWindowFunctionFrame(
target,
processor,
@@ -226,7 +226,7 @@ case class WindowExec(
// Shrinking Frame.
case ("AGGREGATE", frameType, Some(low), None) =>
- target: MutableRow => {
+ target: InternalRow => {
new UnboundedFollowingWindowFunctionFrame(
target,
processor,
@@ -235,7 +235,7 @@ case class WindowExec(
// Moving Frame.
case ("AGGREGATE", frameType, Some(low), Some(high)) =>
- target: MutableRow => {
+ target: InternalRow => {
new SlidingWindowFunctionFrame(
target,
processor,
@@ -245,7 +245,7 @@ case class WindowExec(
// Entire Partition Frame.
case ("AGGREGATE", frameType, None, None) =>
- target: MutableRow => {
+ target: InternalRow => {
new UnboundedWindowFunctionFrame(target, processor)
}
}
@@ -312,7 +312,7 @@ case class WindowExec(
val inputFields = child.output.length
var sorter: UnsafeExternalSorter = null
var rowBuffer: RowBuffer = null
- val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
+ val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
val frames = factories.map(_(windowFunctionResult))
val numFrames = frames.length
private[this] def fetchNextPartition() {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index 2ab9faab7a..70efc0f78d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -56,7 +56,7 @@ private[window] abstract class WindowFunctionFrame {
* @param offset by which rows get moved within a partition.
*/
private[window] final class OffsetWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
ordinal: Int,
expressions: Array[OffsetWindowFunction],
inputSchema: Seq[Attribute],
@@ -136,7 +136,7 @@ private[window] final class OffsetWindowFunctionFrame(
* @param ubound comparator used to identify the upper bound of an output row.
*/
private[window] final class SlidingWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor,
lbound: BoundOrdering,
ubound: BoundOrdering)
@@ -217,7 +217,7 @@ private[window] final class SlidingWindowFunctionFrame(
* @param processor to calculate the row values with.
*/
private[window] final class UnboundedWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor)
extends WindowFunctionFrame {
@@ -255,7 +255,7 @@ private[window] final class UnboundedWindowFunctionFrame(
* @param ubound comparator used to identify the upper bound of an output row.
*/
private[window] final class UnboundedPrecedingWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor,
ubound: BoundOrdering)
extends WindowFunctionFrame {
@@ -317,7 +317,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
* @param lbound comparator used to identify the lower bound of an output row.
*/
private[window] final class UnboundedFollowingWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor,
lbound: BoundOrdering)
extends WindowFunctionFrame {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 34936b38fb..7516be315d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -27,7 +27,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
test("create row") {
- val expected = new GenericMutableRow(4)
+ val expected = new GenericInternalRow(4)
expected.setInt(0, 2147483647)
expected.update(1, UTF8String.fromString("this is a string"))
expected.setBoolean(2, false)
@@ -49,7 +49,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
}
test("SpecificMutableRow.update with null") {
- val row = new SpecificMutableRow(Seq(IntegerType))
+ val row = new SpecificInternalRow(Seq(IntegerType))
row(0) = null
assert(row.isNullAt(0))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index b5eb16b6f6..ffa26f1f82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
import org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMax
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, SpecificInternalRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.execution.aggregate.SortAggregateExec
import org.apache.spark.sql.expressions.Window
@@ -64,7 +64,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
assert(agg.eval(mergeBuffer) == data.map(_._1).max)
// Tests low level eval(row: InternalRow) API.
- val row = new GenericMutableRow(Array(mergeBuffer): Array[Any])
+ val row = new GenericInternalRow(Array(mergeBuffer): Array[Any])
// Evaluates directly on row consist of aggregation buffer object.
assert(agg.eval(row) == data.map(_._1).max)
@@ -73,7 +73,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
test("supports SpecificMutableRow as mutable row") {
val aggregationBufferSchema = Seq(IntegerType, LongType, BinaryType, IntegerType)
val aggBufferOffset = 2
- val buffer = new SpecificMutableRow(aggregationBufferSchema)
+ val buffer = new SpecificInternalRow(aggregationBufferSchema)
val agg = new TypedMax(BoundReference(ordinal = 1, dataType = IntegerType, nullable = false))
.withNewMutableAggBufferOffset(aggBufferOffset)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index 805b566728..8bf9f521e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types._
@@ -54,7 +54,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
expected: Int): Unit = {
assertResult(expected, s"Wrong actualSize for $columnType") {
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
row.update(0, CatalystTypeConverters.convertToCatalyst(value))
val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
columnType.actualSize(proj(row), 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
index 1529313dfb..686c8fa6f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -21,14 +21,14 @@ import scala.collection.immutable.HashSet
import scala.util.Random
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types.{AtomicType, Decimal}
import org.apache.spark.unsafe.types.UTF8String
object ColumnarTestUtils {
- def makeNullRow(length: Int): GenericMutableRow = {
- val row = new GenericMutableRow(length)
+ def makeNullRow(length: Int): GenericInternalRow = {
+ val row = new GenericInternalRow(length)
(0 until length).foreach(row.setNullAt)
row
}
@@ -86,7 +86,7 @@ object ColumnarTestUtils {
tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail)
def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = {
- val row = new GenericMutableRow(columnTypes.length)
+ val row = new GenericInternalRow(columnTypes.length)
makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
row(index) = value
}
@@ -95,11 +95,11 @@ object ColumnarTestUtils {
def makeUniqueValuesAndSingleValueRows[T <: AtomicType](
columnType: NativeColumnType[T],
- count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = {
+ count: Int): (Seq[T#InternalType], Seq[GenericInternalRow]) = {
val values = makeUniqueRandomValues(columnType, count)
val rows = values.map { value =>
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
row(0) = value
row
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
index dc22d3e8e4..8f4ca3cea7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.types._
class TestNullableColumnAccessor[JvmType](
@@ -72,7 +72,7 @@ class NullableColumnAccessorSuite extends SparkFunSuite {
}
val accessor = TestNullableColumnAccessor(builder.build(), columnType)
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType)
(0 until 4).foreach { _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
index cdd4551d64..b2b6e92e9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.types._
class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
@@ -94,7 +94,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
(1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt()))
// For non-null values
- val actual = new GenericMutableRow(new Array[Any](1))
+ val actual = new GenericInternalRow(new Array[Any](1))
(0 until 4).foreach { _ =>
columnType.extract(buffer, actual, 0)
assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index f67e9c7dae..d01bf911e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar.compression
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats}
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
@@ -72,7 +72,7 @@ class BooleanBitSetSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (values.nonEmpty) {
values.foreach {
assert(decoder.hasNext)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index babf944e6a..9005ec93e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.RandomStringUtils
import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING}
import org.apache.spark.sql.types.AtomicType
import org.apache.spark.util.Benchmark
@@ -111,7 +111,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
input.rewind()
benchmark.addCase(label)({ i: Int =>
- val rowBuf = new GenericMutableRow(1)
+ val rowBuf = new GenericInternalRow(1)
for (n <- 0L until iters) {
compressedBuf.rewind.position(4)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
index 830ca0294e..67139b13d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.columnar.compression
import java.nio.ByteBuffer
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types.AtomicType
@@ -97,7 +97,7 @@ class DictionaryEncodingSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = DictionaryEncoding.decoder(buffer, columnType)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index a530e27074..411d31fa0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.columnar.compression
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types.IntegralType
@@ -48,7 +48,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
}
input.foreach { value =>
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
columnType.setField(row, 0, value)
builder.appendFrom(row, 0)
}
@@ -95,7 +95,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = scheme.decoder(buffer, columnType)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (input.nonEmpty) {
input.foreach{
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
index 95642e93ae..dffa9b364e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.columnar.compression
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types.AtomicType
@@ -80,7 +80,7 @@ class RunLengthEncodingSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = RunLengthEncoding.decoder(buffer, columnType)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3161a630af..580eade4b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,7 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -716,7 +716,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
val vectorizedReader = new VectorizedParquetRecordReader
- val partitionValues = new GenericMutableRow(Array(v))
+ val partitionValues = new GenericInternalRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 9dd8d9f804..4c4a7d86f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.internal.SQLConf
@@ -719,7 +719,7 @@ object TestingUDT {
.add("c", DoubleType, nullable = false)
override def serialize(n: NestedStruct): Any = {
- val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+ val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
row.setInt(0, n.a)
row.setLong(1, n.b)
row.setDouble(2, n.c)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index fe34caa0a3..1625116803 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -688,25 +688,25 @@ private[hive] trait HiveInspectors {
* @return A function that performs in-place updating of a MutableRow.
* Use the overloaded ObjectInspector version for assignments.
*/
- def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit =
+ def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit =
field.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
case oi: ByteObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
case oi: ShortObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
case oi: IntObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
case oi: LongObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
case oi: FloatObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
case oi: DoubleObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi =>
val unwrapper = unwrapperFor(oi)
- (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+ (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
}
def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRef = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index ec7e53efc8..2a54163a04 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -120,7 +120,7 @@ class HadoopTableReader(
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
val attrsWithIndex = attributes.zipWithIndex
- val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+ val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
@@ -215,7 +215,7 @@ class HadoopTableReader(
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHadoopConf
val localDeserializer = partDeserializer
- val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+ val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
// Splits all attributes into two groups, partition key attributes and those that are not.
// Attached indices indicate the position of each attribute in the output schema.
@@ -224,7 +224,7 @@ class HadoopTableReader(
relation.partitionKeys.contains(attr)
}
- def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow): Unit = {
+ def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = {
partitionKeyAttrs.foreach { case (attr, ordinal) =>
val partOrdinal = relation.partitionKeys.indexOf(attr)
row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
@@ -360,7 +360,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
iterator: Iterator[Writable],
rawDeser: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
- mutableRow: MutableRow,
+ mutableRow: InternalRow,
tableDeser: Deserializer): Iterator[InternalRow] = {
val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
@@ -381,43 +381,43 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
* Builds specific unwrappers ahead of time according to object inspector
* types to avoid pattern matching and branching costs per row.
*/
- val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
+ val unwrappers: Seq[(Any, InternalRow, Int) => Unit] = fieldRefs.map {
_.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
case oi: ByteObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
case oi: ShortObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
case oi: IntObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
case oi: LongObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
case oi: FloatObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
case oi: DoubleObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi: HiveVarcharObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
case oi: HiveCharObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
case oi: HiveDecimalObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
case oi: TimestampObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
case oi: DateObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
case oi: BinaryObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
row.update(ordinal, oi.getPrimitiveJavaObject(value))
case oi =>
val unwrapper = unwrapperFor(oi)
- (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+ (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index c553c03a9b..1025b8f70d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -124,7 +124,7 @@ case class ScriptTransformation(
} else {
null
}
- val mutableRow = new SpecificMutableRow(output.map(_.dataType))
+ val mutableRow = new SpecificInternalRow(output.map(_.dataType))
@transient
lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index d54913518b..42033080dc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -329,17 +329,17 @@ private[hive] case class HiveUDAFFunction(
// buffer for it.
override def aggBufferSchema: StructType = StructType(Nil)
- override def update(_buffer: MutableRow, input: InternalRow): Unit = {
+ override def update(_buffer: InternalRow, input: InternalRow): Unit = {
val inputs = inputProjection(input)
function.iterate(buffer, wrap(inputs, wrappers, cached, inputDataTypes))
}
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
throw new UnsupportedOperationException(
"Hive UDAF doesn't support partial aggregate")
}
- override def initialize(_buffer: MutableRow): Unit = {
+ override def initialize(_buffer: InternalRow): Unit = {
buffer = function.getNewAggregationBuffer
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 15b72d8d21..e94f49ea81 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -281,7 +281,7 @@ private[orc] object OrcRelation extends HiveInspectors {
maybeStructOI: Option[StructObjectInspector],
iterator: Iterator[Writable]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
- val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
+ val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType))
val unsafeProjection = UnsafeProjection.create(dataSchema)
def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {