aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-12 23:06:31 -0700
committerReynold Xin <rxin@databricks.com>2015-06-12 23:06:31 -0700
commitd46f8e5d4b5c1278e0fae3ad133b2229ac01b197 (patch)
tree7ec124d4ac2ff365a9675d89113cbfee1e8abad8 /sql/hive/src
parent6e9c3ff1ecaf12a0126d83f27f5a4153ae420a34 (diff)
downloadspark-d46f8e5d4b5c1278e0fae3ad133b2229ac01b197.tar.gz
spark-d46f8e5d4b5c1278e0fae3ad133b2229ac01b197.tar.bz2
spark-d46f8e5d4b5c1278e0fae3ad133b2229ac01b197.zip
[SPARK-7186] [SQL] Decouple internal Row from external Row
Currently, we use o.a.s.sql.Row both internally and externally. The external interface is wider than what the internal needs because it is designed to facilitate end-user programming. This design has proven to be very error prone and cumbersome for internal Row implementations. As a first step, we create an InternalRow interface in the catalyst module, which is identical to the current Row interface. And we switch all internal operators/expressions to use this InternalRow instead. When we need to expose Row, we convert the InternalRow implementation into Row for users. For all public API, we use Row (for example, data source APIs), which will be converted into/from InternalRow by CatalystTypeConverters. For all internal data sources (Json, Parquet, JDBC, Hive), we use InternalRow for better performance, casted into Row in buildScan() (without change the public API). When create a PhysicalRDD, we cast them back to InternalRow. cc rxin marmbrus JoshRosen Author: Davies Liu <davies@databricks.com> Closes #6792 from davies/internal_row and squashes the following commits: f2abd13 [Davies Liu] fix scalastyle a7e025c [Davies Liu] move InternalRow into catalyst 30db8ba [Davies Liu] Merge branch 'master' of github.com:apache/spark into internal_row 7cbced8 [Davies Liu] separate Row and InternalRow
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala68
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala32
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala24
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
17 files changed, 111 insertions, 113 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index fd01a8722b..d4f1ae8ee0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -366,7 +366,7 @@ private[hive] trait HiveInspectors {
(o: Any) => {
if (o != null) {
val struct = soi.create()
- (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach {
+ (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[InternalRow].toSeq).zipped.foreach {
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
@@ -474,7 +474,7 @@ private[hive] trait HiveInspectors {
}
case x: SettableStructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
- val row = a.asInstanceOf[Row]
+ val row = a.asInstanceOf[InternalRow]
// 1. create the pojo (most likely) object
val result = x.create()
var i = 0
@@ -490,7 +490,7 @@ private[hive] trait HiveInspectors {
result
case x: StructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
- val row = a.asInstanceOf[Row]
+ val row = a.asInstanceOf[InternalRow]
val result = new java.util.ArrayList[AnyRef](fieldRefs.length)
var i = 0
while (i < fieldRefs.length) {
@@ -517,7 +517,7 @@ private[hive] trait HiveInspectors {
}
def wrap(
- row: Row,
+ row: InternalRow,
inspectors: Seq[ObjectInspector],
cache: Array[AnyRef]): Array[AnyRef] = {
var i = 0
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5a4651a887..619ef63223 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -302,7 +302,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
val location = p.getLocation
- val values = Row.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
+ val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
})
ParquetPartition(values, location)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c6b6510645..452b7f0bcc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
-import org.apache.spark.sql.catalyst.expressions.{Row, _}
+import org.apache.spark.sql.catalyst.expressions.{InternalRow, _}
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -137,7 +137,7 @@ private[hive] trait HiveStrategies {
val partitionLocations = partitions.map(_.getLocation)
if (partitionLocations.isEmpty) {
- PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
+ PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
} else {
hiveContext
.read.parquet(partitionLocations: _*)
@@ -165,7 +165,7 @@ private[hive] trait HiveStrategies {
// TODO: Remove this hack for Spark 1.3.
case iae: java.lang.IllegalArgumentException
if iae.getMessage.contains("Can not create a Path from an empty string") =>
- PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
+ PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
}
case _ => Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index d3c82d8c2e..485810320f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -41,9 +41,9 @@ import org.apache.spark.util.Utils
* A trait for subclasses that handle table scans.
*/
private[hive] sealed trait TableReader {
- def makeRDDForTable(hiveTable: HiveTable): RDD[Row]
+ def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow]
- def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row]
+ def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow]
}
@@ -74,7 +74,7 @@ class HadoopTableReader(
private val _broadcastedHiveConf =
sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf))
- override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
+ override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
hiveTable,
Class.forName(
@@ -94,7 +94,7 @@ class HadoopTableReader(
def makeRDDForTable(
hiveTable: HiveTable,
deserializerClass: Class[_ <: Deserializer],
- filterOpt: Option[PathFilter]): RDD[Row] = {
+ filterOpt: Option[PathFilter]): RDD[InternalRow] = {
assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table,
since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""")
@@ -125,7 +125,7 @@ class HadoopTableReader(
deserializedHadoopRDD
}
- override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = {
+ override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = {
val partitionToDeserializer = partitions.map(part =>
(part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap
makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None)
@@ -144,7 +144,7 @@ class HadoopTableReader(
def makeRDDForPartitionedTable(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
- filterOpt: Option[PathFilter]): RDD[Row] = {
+ filterOpt: Option[PathFilter]): RDD[InternalRow] = {
// SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
def verifyPartitionPath(
@@ -243,7 +243,7 @@ class HadoopTableReader(
// Even if we don't use any partitions, we still need an empty RDD
if (hivePartitionRDDs.size == 0) {
- new EmptyRDD[Row](sc.sparkContext)
+ new EmptyRDD[InternalRow](sc.sparkContext)
} else {
new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
}
@@ -319,7 +319,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
rawDeser: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow,
- tableDeser: Deserializer): Iterator[Row] = {
+ tableDeser: Deserializer): Iterator[InternalRow] = {
val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
@@ -391,7 +391,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
i += 1
}
- mutableRow: Row
+ mutableRow: InternalRow
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 87c36a8b61..0e4a2427a9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.{AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.client.{HiveTable, HiveColumn}
@@ -42,7 +42,7 @@ case class CreateTableAsSelect(
def database: String = tableDesc.database
def tableName: String = tableDesc.name
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
lazy val metastoreRelation: MetastoreRelation = {
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
@@ -89,7 +89,7 @@ case class CreateTableAsSelect(
hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
}
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
override def argString: String = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index 6fce69b58b..a89381000a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -21,12 +21,10 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
-import org.apache.spark.sql.execution.{SparkPlan, RunnableCommand}
-import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
-import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.hive.MetastoreRelation
/**
* Implementation for "describe [extended] table".
@@ -37,7 +35,7 @@ case class DescribeHiveTableCommand(
override val output: Seq[Attribute],
isExtended: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
@@ -59,7 +57,7 @@ case class DescribeHiveTableCommand(
}
results.map { case (name, dataType, comment) =>
- Row(name, dataType, comment)
+ InternalRow(name, dataType, comment)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
index 60a9bb630d..87f8e3f7fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
@@ -1,34 +1,34 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.types.StringType
-
-private[hive]
-case class HiveNativeCommand(sql: String) extends RunnableCommand {
-
- override def output: Seq[AttributeReference] =
- Seq(AttributeReference("result", StringType, nullable = false)())
-
- override def run(sqlContext: SQLContext): Seq[Row] =
- sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_))
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, InternalRow}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.StringType
+
+private[hive]
+case class HiveNativeCommand(sql: String) extends RunnableCommand {
+
+ override def output: Seq[AttributeReference] =
+ Seq(AttributeReference("result", StringType, nullable = false)())
+
+ override def run(sqlContext: SQLContext): Seq[InternalRow] =
+ sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(InternalRow(_))
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 11ee550314..1f5e4af2e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -129,7 +129,7 @@ case class HiveTableScan(
}
}
- protected override def doExecute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) {
+ protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index eeb472602b..1d306c5d10 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -30,7 +30,8 @@ import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
@@ -60,7 +61,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = child.output
def saveAsHiveFile(
- rdd: RDD[Row],
+ rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
conf: SerializableWritable[JobConf],
@@ -82,7 +83,7 @@ case class InsertIntoHiveTable(
writerContainer.commitJob()
// Note that this function is executed on executor side
- def writeToFile(context: TaskContext, iterator: Iterator[Row]): Unit = {
+ def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
@@ -119,7 +120,7 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
- protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
@@ -250,12 +251,13 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
- override def executeCollect(): Array[Row] = sideEffectResult.toArray
+ override def executeCollect(): Array[Row] =
+ sideEffectResult.toArray
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 28792db768..9d8872aa47 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -55,7 +55,7 @@ case class ScriptTransformation(
override def otherCopyArgs: Seq[HiveContext] = sc :: Nil
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)
@@ -72,8 +72,8 @@ case class ScriptTransformation(
val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
- val iterator: Iterator[Row] = new Iterator[Row] with HiveInspectors {
- var cacheRow: Row = null
+ val iterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors {
+ var cacheRow: InternalRow = null
var curLine: String = null
var eof: Boolean = false
@@ -90,7 +90,7 @@ case class ScriptTransformation(
}
}
- def deserialize(): Row = {
+ def deserialize(): InternalRow = {
if (cacheRow != null) return cacheRow
val mutableRow = new SpecificMutableRow(output.map(_.dataType))
@@ -120,7 +120,7 @@ case class ScriptTransformation(
}
}
- override def next(): Row = {
+ override def next(): InternalRow = {
if (!hasNext) {
throw new NoSuchElementException
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 0ba94d7b7c..195e5752c3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
@@ -39,9 +39,9 @@ import org.apache.spark.util.Utils
private[hive]
case class AnalyzeTable(tableName: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
sqlContext.asInstanceOf[HiveContext].analyze(tableName)
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
}
@@ -53,7 +53,7 @@ case class DropTable(
tableName: String,
ifExists: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
try {
@@ -70,7 +70,7 @@ case class DropTable(
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(Seq(tableName))
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
}
@@ -83,7 +83,7 @@ case class AddJar(path: String) extends RunnableCommand {
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val currentClassLoader = Utils.getContextOrSparkClassLoader
@@ -99,18 +99,18 @@ case class AddJar(path: String) extends RunnableCommand {
// Add jar to executors
hiveContext.sparkContext.addJar(path)
- Seq(Row(0))
+ Seq(InternalRow(0))
}
}
private[hive]
case class AddFile(path: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD FILE $path")
hiveContext.sparkContext.addFile(path)
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
}
@@ -123,12 +123,12 @@ case class CreateMetastoreDataSource(
allowExisting: Boolean,
managedIfNoPath: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
if (hiveContext.catalog.tableExists(tableName :: Nil)) {
if (allowExisting) {
- return Seq.empty[Row]
+ return Seq.empty[InternalRow]
} else {
throw new AnalysisException(s"Table $tableName already exists.")
}
@@ -151,7 +151,7 @@ case class CreateMetastoreDataSource(
optionsWithPath,
isExternal)
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
}
@@ -164,7 +164,7 @@ case class CreateMetastoreDataSourceAsSelect(
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sqlContext: SQLContext): Seq[InternalRow] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
var createMetastoreTable = false
var isExternal = true
@@ -188,7 +188,7 @@ case class CreateMetastoreDataSourceAsSelect(
s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
case SaveMode.Ignore =>
// Since the table already exists and the save mode is Ignore, we will just return.
- return Seq.empty[Row]
+ return Seq.empty[InternalRow]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
val resolved = ResolvedDataSource(
@@ -230,7 +230,7 @@ case class CreateMetastoreDataSourceAsSelect(
val data = DataFrame(hiveContext, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
- case Some(schema) => sqlContext.createDataFrame(data.queryExecution.toRdd, schema)
+ case Some(schema) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, schema)
case None => data
}
@@ -253,6 +253,6 @@ case class CreateMetastoreDataSourceAsSelect(
// Refresh the cache of the table in the catalog.
hiveContext.refreshTable(tableName)
- Seq.empty[Row]
+ Seq.empty[InternalRow]
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index a46ee9da90..c40dd4e4b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -121,7 +121,7 @@ private[hive] case class HiveSimpleUdf(funcWrapper: HiveFunctionWrapper, childre
protected lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)
// TODO: Finish input output types.
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
unwrap(
FunctionRegistry.invoke(method, function, conversionHelper
.convertIfNecessary(wrap(children.map(c => c.eval(input)), arguments, cached): _*): _*),
@@ -178,7 +178,7 @@ private[hive] case class HiveGenericUdf(funcWrapper: HiveFunctionWrapper, childr
lazy val dataType: DataType = inspectorToDataType(returnInspector)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
returnInspector // Make sure initialized.
var i = 0
@@ -345,7 +345,7 @@ private[hive] case class HiveWindowFunction(
def nullable: Boolean = true
- override def eval(input: Row): Any =
+ override def eval(input: InternalRow): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
@transient
@@ -369,7 +369,7 @@ private[hive] case class HiveWindowFunction(
evaluator.reset(hiveEvaluatorBuffer)
}
- override def prepareInputParameters(input: Row): AnyRef = {
+ override def prepareInputParameters(input: InternalRow): AnyRef = {
wrap(inputProjection(input), inputInspectors, new Array[AnyRef](children.length))
}
// Add input parameters for a single row.
@@ -512,7 +512,7 @@ private[hive] case class HiveGenericUdtf(
field => (inspectorToDataType(field.getFieldObjectInspector), true)
}
- override def eval(input: Row): TraversableOnce[Row] = {
+ override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
outputInspector // Make sure initialized.
val inputProjection = new InterpretedProjection(children)
@@ -522,23 +522,23 @@ private[hive] case class HiveGenericUdtf(
}
protected class UDTFCollector extends Collector {
- var collected = new ArrayBuffer[Row]
+ var collected = new ArrayBuffer[InternalRow]
override def collect(input: java.lang.Object) {
// We need to clone the input here because implementations of
// GenericUDTF reuse the same object. Luckily they are always an array, so
// it is easy to clone.
- collected += unwrap(input, outputInspector).asInstanceOf[Row]
+ collected += unwrap(input, outputInspector).asInstanceOf[InternalRow]
}
- def collectRows(): Seq[Row] = {
+ def collectRows(): Seq[InternalRow] = {
val toCollect = collected
- collected = new ArrayBuffer[Row]
+ collected = new ArrayBuffer[InternalRow]
toCollect
}
}
- override def terminate(): TraversableOnce[Row] = {
+ override def terminate(): TraversableOnce[InternalRow] = {
outputInspector // Make sure initialized.
function.close()
collector.collectRows()
@@ -578,7 +578,7 @@ private[hive] case class HiveUdafFunction(
private val buffer =
function.getNewAggregationBuffer
- override def eval(input: Row): Any = unwrap(function.evaluate(buffer), returnInspector)
+ override def eval(input: InternalRow): Any = unwrap(function.evaluate(buffer), returnInspector)
@transient
val inputProjection = new InterpretedProjection(exprs)
@@ -586,7 +586,7 @@ private[hive] case class HiveUdafFunction(
@transient
protected lazy val cached = new Array[AnyRef](exprs.length)
- def update(input: Row): Unit = {
+ def update(input: InternalRow): Unit = {
val inputs = inputProjection(input)
function.iterate(buffer, wrap(inputs, inspectors, cached))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
index df137e7b2b..aff0456b37 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
@@ -28,8 +28,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{Literal, Row}
+import org.apache.spark.sql.catalyst.expressions.{Literal, InternalRow}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.Row
class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
test("Test wrap SettableStructObjectInspector") {
@@ -45,7 +46,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
classOf[UDAFPercentile.State],
ObjectInspectorOptions.JAVA).asInstanceOf[StructObjectInspector]
- val a = unwrap(state, soi).asInstanceOf[Row]
+ val a = unwrap(state, soi).asInstanceOf[InternalRow]
val b = wrap(a, soi).asInstanceOf[UDAFPercentile.State]
val sfCounts = soi.getStructFieldRef("counts")
@@ -127,7 +128,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
}
}
- def checkValues(row1: Seq[Any], row2: Row): Unit = {
+ def checkValues(row1: Seq[Any], row2: InternalRow): Unit = {
row1.zip(row2.toSeq).foreach { case (r1, r2) =>
checkValue(r1, r2)
}
@@ -203,7 +204,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
})
checkValues(row,
- unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[Row])
+ unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[InternalRow])
checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt)))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 5a5ea10e3c..a0d80dc39c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.{QueryTest, SQLConf}
+import org.apache.spark.sql.{QueryTest, Row, SQLConf}
case class Cases(lower: String, UPPER: String)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
index 0e63d84e98..8707f9f936 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.InternalRow
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index b384fb39f3..267d22c6b5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.InternalRow
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e62ac909cb..3864349cdb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,8 +21,6 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
@@ -30,7 +28,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.