aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-28 08:03:58 -0700
committerDavies Liu <davies@databricks.com>2015-06-28 08:03:58 -0700
commit77da5be6f11a7e9cb1d44f7fb97b93481505afe8 (patch)
tree95badc0ee5149fb7ce126a7a3590877415e10981 /sql/core
parent52d128180166280af443fae84ac61386f3d6c500 (diff)
downloadspark-77da5be6f11a7e9cb1d44f7fb97b93481505afe8.tar.gz
spark-77da5be6f11a7e9cb1d44f7fb97b93481505afe8.tar.bz2
spark-77da5be6f11a7e9cb1d44f7fb97b93481505afe8.zip
[SPARK-8610] [SQL] Separate Row and InternalRow (part 2)
Currently, we use GenericRow both for Row and InternalRow, which is confusing because it could contain Scala type also Catalyst types. This PR changes to use GenericInternalRow for InternalRow (contains catalyst types), GenericRow for Row (contains Scala types). Also fixes some incorrect use of InternalRow or Row. Author: Davies Liu <davies@databricks.com> Closes #7003 from davies/internalrow and squashes the following commits: d05866c [Davies Liu] fix test: rollback changes for pyspark 72878dd [Davies Liu] Merge branch 'master' of github.com:apache/spark into internalrow efd0b25 [Davies Liu] fix copy of MutableRow 87b13cf [Davies Liu] fix test d2ebd72 [Davies Liu] fix style eb4b473 [Davies Liu] mark expensive API as final bd4e99c [Davies Liu] Merge branch 'master' of github.com:apache/spark into internalrow bdfb78f [Davies Liu] remove BaseMutableRow 6f99a97 [Davies Liu] fix catalyst test defe931 [Davies Liu] remove BaseRow 288b31f [Davies Liu] Merge branch 'master' of github.com:apache/spark into internalrow 9d24350 [Davies Liu] separate Row and InternalRow (part 2)
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala70
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala4
15 files changed, 85 insertions, 81 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 5708df82de..8ed44ee141 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
/**
@@ -377,10 +378,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
val row = new SpecificMutableRow(dataType :: Nil)
iter.map { v =>
row.setInt(0, v)
- row: Row
+ row: InternalRow
}
}
- DataFrameHolder(self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
+ DataFrameHolder(
+ self.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}
/**
@@ -393,10 +395,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
val row = new SpecificMutableRow(dataType :: Nil)
iter.map { v =>
row.setLong(0, v)
- row: Row
+ row: InternalRow
}
}
- DataFrameHolder(self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
+ DataFrameHolder(
+ self.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}
/**
@@ -408,11 +411,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
val rows = data.mapPartitions { iter =>
val row = new SpecificMutableRow(dataType :: Nil)
iter.map { v =>
- row.setString(0, v)
- row: Row
+ row.update(0, UTF8String.fromString(v))
+ row: InternalRow
}
}
- DataFrameHolder(self.createDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
+ DataFrameHolder(
+ self.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}
}
@@ -559,9 +563,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
(e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
}
iter.map { row =>
- new GenericRow(
+ new GenericInternalRow(
methodsToConverts.map { case (e, convert) => convert(e.invoke(row)) }.toArray[Any]
- ) : InternalRow
+ ): InternalRow
}
}
DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
@@ -1065,7 +1069,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
val rowRdd = convertedRdd.mapPartitions { iter =>
- iter.map { m => new GenericRow(m): InternalRow}
+ iter.map { m => new GenericInternalRow(m): InternalRow}
}
DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 8e21020917..8bf2151e4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
@@ -63,7 +63,7 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
* Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this
* method to avoid boxing/unboxing costs whenever possible.
*/
- def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
append(getField(row, ordinal), buffer)
}
@@ -71,13 +71,13 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
* Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable
* length types such as byte arrays and strings.
*/
- def actualSize(row: Row, ordinal: Int): Int = defaultSize
+ def actualSize(row: InternalRow, ordinal: Int): Int = defaultSize
/**
* Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs
* whenever possible.
*/
- def getField(row: Row, ordinal: Int): JvmType
+ def getField(row: InternalRow, ordinal: Int): JvmType
/**
* Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing
@@ -89,7 +89,7 @@ private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
* Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
* boxing/unboxing costs whenever possible.
*/
- def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
to(toOrdinal) = from(fromOrdinal)
}
@@ -118,7 +118,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
buffer.putInt(v)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.putInt(row.getInt(ordinal))
}
@@ -134,9 +134,9 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
row.setInt(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Int = row.getInt(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setInt(toOrdinal, from.getInt(fromOrdinal))
}
}
@@ -146,7 +146,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
buffer.putLong(v)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.putLong(row.getLong(ordinal))
}
@@ -162,9 +162,9 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
row.setLong(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Long = row.getLong(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setLong(toOrdinal, from.getLong(fromOrdinal))
}
}
@@ -174,7 +174,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
buffer.putFloat(v)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.putFloat(row.getFloat(ordinal))
}
@@ -190,9 +190,9 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
row.setFloat(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Float = row.getFloat(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
}
}
@@ -202,7 +202,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
buffer.putDouble(v)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.putDouble(row.getDouble(ordinal))
}
@@ -218,9 +218,9 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
row.setDouble(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Double = row.getDouble(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
}
}
@@ -230,7 +230,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
buffer.put(if (v) 1: Byte else 0: Byte)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte)
}
@@ -244,9 +244,9 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
row.setBoolean(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Boolean = row.getBoolean(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
}
}
@@ -256,7 +256,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
buffer.put(v)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.put(row.getByte(ordinal))
}
@@ -272,9 +272,9 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
row.setByte(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Byte = row.getByte(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setByte(toOrdinal, from.getByte(fromOrdinal))
}
}
@@ -284,7 +284,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
buffer.putShort(v)
}
- override def append(row: Row, ordinal: Int, buffer: ByteBuffer): Unit = {
+ override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = {
buffer.putShort(row.getShort(ordinal))
}
@@ -300,15 +300,15 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
row.setShort(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): Short = row.getShort(ordinal)
+ override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal)
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.setShort(toOrdinal, from.getShort(fromOrdinal))
}
}
private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
- override def actualSize(row: Row, ordinal: Int): Int = {
+ override def actualSize(row: InternalRow, ordinal: Int): Int = {
row.getString(ordinal).getBytes("utf-8").length + 4
}
@@ -328,11 +328,11 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
row.update(ordinal, value)
}
- override def getField(row: Row, ordinal: Int): UTF8String = {
+ override def getField(row: InternalRow, ordinal: Int): UTF8String = {
row(ordinal).asInstanceOf[UTF8String]
}
- override def copyField(from: Row, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
to.update(toOrdinal, from(fromOrdinal))
}
}
@@ -346,7 +346,7 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
buffer.putInt(v)
}
- override def getField(row: Row, ordinal: Int): Int = {
+ override def getField(row: InternalRow, ordinal: Int): Int = {
row(ordinal).asInstanceOf[Int]
}
@@ -364,7 +364,7 @@ private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
buffer.putLong(v)
}
- override def getField(row: Row, ordinal: Int): Long = {
+ override def getField(row: InternalRow, ordinal: Int): Long = {
row(ordinal).asInstanceOf[Long]
}
@@ -387,7 +387,7 @@ private[sql] case class FIXED_DECIMAL(precision: Int, scale: Int)
buffer.putLong(v.toUnscaledLong)
}
- override def getField(row: Row, ordinal: Int): Decimal = {
+ override def getField(row: InternalRow, ordinal: Int): Decimal = {
row(ordinal).asInstanceOf[Decimal]
}
@@ -405,7 +405,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
defaultSize: Int)
extends ColumnType[T, Array[Byte]](typeId, defaultSize) {
- override def actualSize(row: Row, ordinal: Int): Int = {
+ override def actualSize(row: InternalRow, ordinal: Int): Int = {
getField(row, ordinal).length + 4
}
@@ -426,7 +426,7 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](11, 16)
row(ordinal) = value
}
- override def getField(row: Row, ordinal: Int): Array[Byte] = {
+ override def getField(row: InternalRow, ordinal: Int): Array[Byte] = {
row(ordinal).asInstanceOf[Array[Byte]]
}
}
@@ -439,7 +439,7 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](12, 16) {
row(ordinal) = SparkSqlSerializer.deserialize[Any](value)
}
- override def getField(row: Row, ordinal: Int): Array[Byte] = {
+ override def getField(row: InternalRow, ordinal: Int): Array[Byte] = {
SparkSqlSerializer.serialize(row(ordinal))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 761f427b8c..cb1fd4947f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -146,7 +146,8 @@ private[sql] case class InMemoryRelation(
rowCount += 1
}
- val stats = InternalRow.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*)
+ val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
+ .flatMap(_.toSeq))
batchStats += stats
CachedBatch(columnBuilders.map(_.build().array()), stats)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index eea15aff5d..b19ad4f1c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -20,22 +20,20 @@ package org.apache.spark.sql.execution
import java.nio.ByteBuffer
import java.util.{HashMap => JavaHashMap}
-import org.apache.spark.sql.types.Decimal
-
import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
-import com.esotericsoftware.kryo.{Serializer, Kryo}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.twitter.chill.ResourcePool
-import org.apache.spark.{SparkEnv, SparkConf}
-import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.util.collection.OpenHashSet
-import org.apache.spark.util.MutablePair
-
+import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHashSet}
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.util.MutablePair
+import org.apache.spark.util.collection.OpenHashSet
+import org.apache.spark.{SparkConf, SparkEnv}
private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
@@ -43,6 +41,7 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
+ kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog],
new HyperLogLogSerializer)
@@ -139,7 +138,7 @@ private[sql] class OpenHashSetSerializer extends Serializer[OpenHashSet[_]] {
val iterator = hs.iterator
while(iterator.hasNext) {
val row = iterator.next()
- rowSerializer.write(kryo, output, row.asInstanceOf[GenericRow].values)
+ rowSerializer.write(kryo, output, row.asInstanceOf[GenericInternalRow].values)
}
}
@@ -150,7 +149,7 @@ private[sql] class OpenHashSetSerializer extends Serializer[OpenHashSet[_]] {
var i = 0
while (i < numItems) {
val row =
- new GenericRow(rowSerializer.read(
+ new GenericInternalRow(rowSerializer.read(
kryo,
input,
classOf[Array[Any]].asInstanceOf[Class[Any]]).asInstanceOf[Array[Any]])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 15b6936acd..74a22353b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -26,7 +26,8 @@ import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.serializer._
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -329,7 +330,7 @@ private[sql] object SparkSqlSerializer2 {
*/
def createDeserializationFunction(
schema: Array[DataType],
- in: DataInputStream): (MutableRow) => Row = {
+ in: DataInputStream): (MutableRow) => InternalRow = {
if (schema == null) {
(mutableRow: MutableRow) => null
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 21912cf249..5daf86d817 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -210,8 +210,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
- protected lazy val singleRowRdd =
- sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): InternalRow), 1)
+ protected lazy val singleRowRdd = sparkContext.parallelize(Seq(InternalRow()), 1)
object TakeOrderedAndProject extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index bce0e8d70a..e41538ec1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -71,8 +71,8 @@ case class HashOuterJoin(
@transient private[this] lazy val DUMMY_LIST = Seq[InternalRow](null)
@transient private[this] lazy val EMPTY_LIST = Seq.empty[InternalRow]
- @transient private[this] lazy val leftNullRow = new GenericRow(left.output.length)
- @transient private[this] lazy val rightNullRow = new GenericRow(right.output.length)
+ @transient private[this] lazy val leftNullRow = new GenericInternalRow(left.output.length)
+ @transient private[this] lazy val rightNullRow = new GenericInternalRow(right.output.length)
@transient private[this] lazy val boundCondition =
condition.map(
newPredicate(_, left.output ++ right.output)).getOrElse((row: InternalRow) => true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index f9c3fe92c2..036f5d253e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -183,9 +183,9 @@ object EvaluatePython {
}.toMap
case (c, StructType(fields)) if c.getClass.isArray =>
- new GenericRow(c.asInstanceOf[Array[_]].zip(fields).map {
+ new GenericInternalRow(c.asInstanceOf[Array[_]].zip(fields).map {
case (e, f) => fromJava(e, f.dataType)
- }): Row
+ })
case (c: java.util.Calendar, DateType) =>
DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 252c611d02..042e2c9cbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Cast}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
private[sql] object StatFunctions extends Logging {
@@ -123,7 +124,7 @@ private[sql] object StatFunctions extends Logging {
countsRow.setLong(distinctCol2.get(row.get(1)).get + 1, row.getLong(2))
}
// the value of col1 is the first value, the rest are the counts
- countsRow.setString(0, col1Item.toString)
+ countsRow.update(0, UTF8String.fromString(col1Item.toString))
countsRow
}.toSeq
val headerNames = distinctCol2.map(r => StructField(r._1.toString, LongType)).toSeq
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 8b4276b2c3..30c5f4ca3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -417,7 +417,7 @@ private[sql] class JDBCRDD(
case IntegerConversion => mutableRow.setInt(i, rs.getInt(pos))
case LongConversion => mutableRow.setLong(i, rs.getLong(pos))
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
- case StringConversion => mutableRow.setString(i, rs.getString(pos))
+ case StringConversion => mutableRow.update(i, UTF8String.fromString(rs.getString(pos)))
case TimestampConversion =>
val t = rs.getTimestamp(pos)
if (t != null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index cf7aa44e4c..ae7cbf0624 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -318,7 +318,7 @@ private[parquet] class CatalystGroupConverter(
// Note: this will ever only be called in the root converter when the record has been
// fully processed. Therefore it will be difficult to use mutable rows instead, since
// any non-root converter never would be sure when it would be safe to re-use the buffer.
- new GenericRow(current.toArray)
+ new GenericInternalRow(current.toArray)
}
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
@@ -342,8 +342,8 @@ private[parquet] class CatalystGroupConverter(
override def end(): Unit = {
if (!isRootConverter) {
assert(current != null) // there should be no empty groups
- buffer.append(new GenericRow(current.toArray))
- parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]]))
+ buffer.append(new GenericInternalRow(current.toArray))
+ parent.updateField(index, new GenericInternalRow(buffer.toArray.asInstanceOf[Array[Any]]))
}
}
}
@@ -788,7 +788,7 @@ private[parquet] class CatalystStructConverter(
// here we need to make sure to use StructScalaType
// Note: we need to actually make a copy of the array since we
// may be in a nested field
- parent.updateField(index, new GenericRow(current.toArray))
+ parent.updateField(index, new GenericInternalRow(current.toArray))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index dbb369cf45..54c8eeb41a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -44,7 +44,7 @@ private[sql] case class InsertIntoDataSource(
overwrite: Boolean)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = DataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
@@ -54,7 +54,7 @@ private[sql] case class InsertIntoDataSource(
// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(logicalRelation)
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
}
@@ -86,7 +86,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
mode: SaveMode)
extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
require(
relation.paths.length == 1,
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index ece3d6fdf2..4cb5ba2f0d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql
import java.sql.{Date, Timestamp}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions._
case class ReflectData(
stringField: String,
@@ -128,16 +127,16 @@ class ScalaReflectionRelationSuite extends SparkFunSuite {
Seq(data).toDF().registerTempTable("reflectComplexData")
assert(ctx.sql("SELECT * FROM reflectComplexData").collect().head ===
- new GenericRow(Array[Any](
+ Row(
Seq(1, 2, 3),
Seq(1, 2, null),
Map(1 -> 10L, 2 -> 20L),
Map(1 -> 10L, 2 -> 20L, 3 -> null),
- new GenericRow(Array[Any](
+ Row(
Seq(10, 20, 30),
Seq(10, 20, null),
Map(10 -> 100L, 20 -> 200L),
Map(10 -> 100L, 20 -> 200L, 30 -> null),
- new GenericRow(Array[Any](null, "abc")))))))
+ Row(null, "abc"))))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index 5fc53f7012..54e1efb6e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -62,7 +62,7 @@ case class SimpleDDLScan(from: Int, to: Int, table: String)(@transient val sqlCo
override def buildScan(): RDD[Row] = {
sqlContext.sparkContext.parallelize(from to to).map { e =>
- InternalRow(UTF8String.fromString(s"people$e"), e * 2)
+ InternalRow(UTF8String.fromString(s"people$e"), e * 2): Row
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index de0ed0c042..2c916f3322 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -90,8 +90,8 @@ case class AllDataTypesScan(
Seq(Map(UTF8String.fromString(s"str_$i") -> InternalRow(i.toLong))),
Map(i -> UTF8String.fromString(i.toString)),
Map(Map(UTF8String.fromString(s"str_$i") -> i.toFloat) -> InternalRow(i.toLong)),
- Row(i, UTF8String.fromString(i.toString)),
- Row(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")),
+ InternalRow(i, UTF8String.fromString(i.toString)),
+ InternalRow(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")),
InternalRow(Seq(DateTimeUtils.fromJavaDate(new Date(1970, 1, i + 1))))))
}
}