aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-28 08:03:58 -0700
committerDavies Liu <davies@databricks.com>2015-06-28 08:03:58 -0700
commit77da5be6f11a7e9cb1d44f7fb97b93481505afe8 (patch)
tree95badc0ee5149fb7ce126a7a3590877415e10981 /sql/hive
parent52d128180166280af443fae84ac61386f3d6c500 (diff)
downloadspark-77da5be6f11a7e9cb1d44f7fb97b93481505afe8.tar.gz
spark-77da5be6f11a7e9cb1d44f7fb97b93481505afe8.tar.bz2
spark-77da5be6f11a7e9cb1d44f7fb97b93481505afe8.zip
[SPARK-8610] [SQL] Separate Row and InternalRow (part 2)
Currently, we use GenericRow both for Row and InternalRow, which is confusing because it could contain Scala type also Catalyst types. This PR changes to use GenericInternalRow for InternalRow (contains catalyst types), GenericRow for Row (contains Scala types). Also fixes some incorrect use of InternalRow or Row. Author: Davies Liu <davies@databricks.com> Closes #7003 from davies/internalrow and squashes the following commits: d05866c [Davies Liu] fix test: rollback changes for pyspark 72878dd [Davies Liu] Merge branch 'master' of github.com:apache/spark into internalrow efd0b25 [Davies Liu] fix copy of MutableRow 87b13cf [Davies Liu] fix test d2ebd72 [Davies Liu] fix style eb4b473 [Davies Liu] mark expensive API as final bd4e99c [Davies Liu] Merge branch 'master' of github.com:apache/spark into internalrow bdfb78f [Davies Liu] remove BaseMutableRow 6f99a97 [Davies Liu] fix catalyst test defe931 [Davies Liu] remove BaseRow 288b31f [Davies Liu] Merge branch 'master' of github.com:apache/spark into internalrow 9d24350 [Davies Liu] separate Row and InternalRow (part 2)
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala37
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala4
10 files changed, 48 insertions, 50 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 864c888ab0..a6b8ead577 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -336,9 +336,8 @@ private[hive] trait HiveInspectors {
// currently, hive doesn't provide the ConstantStructObjectInspector
case si: StructObjectInspector =>
val allRefs = si.getAllStructFieldRefs
- new GenericRow(
- allRefs.map(r =>
- unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector)).toArray)
+ InternalRow.fromSeq(
+ allRefs.map(r => unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector)))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 00e61e35d4..b251a9523b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -34,6 +34,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
@@ -356,7 +357,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
(value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi: HiveVarcharObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
- row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue)
+ row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
case oi: HiveDecimalObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 0e4a2427a9..84358cb73c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,13 +17,11 @@
package org.apache.spark.sql.hive.execution
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.client.{HiveTable, HiveColumn}
-import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
/**
* Create table and insert the query result into it.
@@ -42,11 +40,11 @@ case class CreateTableAsSelect(
def database: String = tableDesc.database
def tableName: String = tableDesc.name
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
lazy val metastoreRelation: MetastoreRelation = {
- import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextInputFormat
@@ -89,7 +87,7 @@ case class CreateTableAsSelect(
hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
}
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
override def argString: String = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index a89381000a..5f0ed5393d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -21,10 +21,10 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
+import org.apache.spark.sql.{Row, SQLContext}
/**
* Implementation for "describe [extended] table".
@@ -35,7 +35,7 @@ case class DescribeHiveTableCommand(
override val output: Seq[Attribute],
isExtended: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
@@ -57,7 +57,7 @@ case class DescribeHiveTableCommand(
}
results.map { case (name, dataType, comment) =>
- InternalRow(name, dataType, comment)
+ Row(name, dataType, comment)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
index 87f8e3f7fc..41b645b2c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.hive.execution
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SQLContext}
private[hive]
case class HiveNativeCommand(sql: String) extends RunnableCommand {
@@ -29,6 +29,6 @@ case class HiveNativeCommand(sql: String) extends RunnableCommand {
override def output: Seq[AttributeReference] =
Seq(AttributeReference("result", StringType, nullable = false)())
- override def run(sqlContext: SQLContext): Seq[InternalRow] =
- sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(InternalRow(_))
+ override def run(sqlContext: SQLContext): Seq[Row] =
+ sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 1f5e4af2e4..f4c8c9a7e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -123,7 +123,7 @@ case class HiveTableScan(
// Only partitioned values are needed here, since the predicate has already been bound to
// partition key attribute references.
- val row = new GenericRow(castedValues.toArray)
+ val row = InternalRow.fromSeq(castedValues)
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 9d8872aa47..611888055d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -129,11 +129,11 @@ case class ScriptTransformation(
val prevLine = curLine
curLine = reader.readLine()
if (!ioschema.schemaLess) {
- new GenericRow(CatalystTypeConverters.convertToCatalyst(
+ new GenericInternalRow(CatalystTypeConverters.convertToCatalyst(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")))
.asInstanceOf[Array[Any]])
} else {
- new GenericRow(CatalystTypeConverters.convertToCatalyst(
+ new GenericInternalRow(CatalystTypeConverters.convertToCatalyst(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2))
.asInstanceOf[Array[Any]])
}
@@ -167,7 +167,8 @@ case class ScriptTransformation(
outputStream.write(data)
} else {
- val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+ val writable = inputSerde.serialize(
+ row.asInstanceOf[GenericInternalRow].values, inputSoi)
prepareWritable(writable).write(dataOutputStream)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index aad58bfa2e..71fa3e9c33 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,15 +17,14 @@
package org.apache.spark.sql.hive.execution
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -39,9 +38,9 @@ import org.apache.spark.util.Utils
private[hive]
case class AnalyzeTable(tableName: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.asInstanceOf[HiveContext].analyze(tableName)
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
}
@@ -53,7 +52,7 @@ case class DropTable(
tableName: String,
ifExists: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
try {
@@ -70,7 +69,7 @@ case class DropTable(
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(Seq(tableName))
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
}
@@ -83,7 +82,7 @@ case class AddJar(path: String) extends RunnableCommand {
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val currentClassLoader = Utils.getContextOrSparkClassLoader
@@ -105,18 +104,18 @@ case class AddJar(path: String) extends RunnableCommand {
// Add jar to executors
hiveContext.sparkContext.addJar(path)
- Seq(InternalRow(0))
+ Seq(Row(0))
}
}
private[hive]
case class AddFile(path: String) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD FILE $path")
hiveContext.sparkContext.addFile(path)
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
}
@@ -129,12 +128,12 @@ case class CreateMetastoreDataSource(
allowExisting: Boolean,
managedIfNoPath: Boolean) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
if (hiveContext.catalog.tableExists(tableName :: Nil)) {
if (allowExisting) {
- return Seq.empty[InternalRow]
+ return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table $tableName already exists.")
}
@@ -157,7 +156,7 @@ case class CreateMetastoreDataSource(
optionsWithPath,
isExternal)
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
}
@@ -170,7 +169,7 @@ case class CreateMetastoreDataSourceAsSelect(
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {
- override def run(sqlContext: SQLContext): Seq[InternalRow] = {
+ override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
var createMetastoreTable = false
var isExternal = true
@@ -194,7 +193,7 @@ case class CreateMetastoreDataSourceAsSelect(
s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
case SaveMode.Ignore =>
// Since the table already exists and the save mode is Ignore, we will just return.
- return Seq.empty[InternalRow]
+ return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
val resolved = ResolvedDataSource(
@@ -259,6 +258,6 @@ case class CreateMetastoreDataSourceAsSelect(
// Refresh the cache of the table in the catalog.
hiveContext.refreshTable(tableName)
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 0fd7b3a91d..300f83d914 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -190,7 +190,7 @@ private[sql] class OrcRelation(
filters: Array[Filter],
inputPaths: Array[FileStatus]): RDD[Row] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
- OrcTableScan(output, this, filters, inputPaths).execute()
+ OrcTableScan(output, this, filters, inputPaths).execute().map(_.asInstanceOf[Row])
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
@@ -234,13 +234,13 @@ private[orc] case class OrcTableScan(
HiveShim.appendReadColumns(conf, sortedIds, sortedNames)
}
- // Transform all given raw `Writable`s into `Row`s.
+ // Transform all given raw `Writable`s into `InternalRow`s.
private def fillObject(
path: String,
conf: Configuration,
iterator: Iterator[Writable],
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
- mutableRow: MutableRow): Iterator[Row] = {
+ mutableRow: MutableRow): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val soi = OrcFileOperator.getObjectInspector(path, Some(conf))
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
@@ -261,11 +261,11 @@ private[orc] case class OrcTableScan(
}
i += 1
}
- mutableRow: Row
+ mutableRow: InternalRow
}
}
- def execute(): RDD[Row] = {
+ def execute(): RDD[InternalRow] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
index aff0456b37..a93acb938d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
@@ -202,9 +202,9 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
val dt = StructType(dataTypes.zipWithIndex.map {
case (t, idx) => StructField(s"c_$idx", t)
})
-
+ val inspector = toInspector(dt)
checkValues(row,
- unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[InternalRow])
+ unwrap(wrap(InternalRow.fromSeq(row), inspector), inspector).asInstanceOf[InternalRow])
checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt)))
}