aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-01-02 22:39:25 -0800
committerReynold Xin <rxin@databricks.com>2016-01-02 22:39:25 -0800
commit6c5bbd628aaedb6efb44c15f816fea8fb600decc (patch)
tree77de3f7f958f3a12f12621a28ba23e379fe9afa6 /sql/hive
parent513e3b092c4f3d58058ff64c861ea35cfec04205 (diff)
downloadspark-6c5bbd628aaedb6efb44c15f816fea8fb600decc.tar.gz
spark-6c5bbd628aaedb6efb44c15f816fea8fb600decc.tar.bz2
spark-6c5bbd628aaedb6efb44c15f816fea8fb600decc.zip
Revert "Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow""
This reverts commit 44ee920fd49d35b421ae562ea99bcc8f2b98ced6.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
4 files changed, 21 insertions, 44 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 8141136de5..1588728bdb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -132,11 +132,17 @@ case class HiveTableScan(
}
}
- protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
- } else {
- hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ protected override def doExecute(): RDD[InternalRow] = {
+ val rdd = if (!relation.hiveQlTable.isPartitioned) {
+ hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ } else {
+ hadoopReader.makeRDDForPartitionedTable(
+ prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ }
+ rdd.mapPartitionsInternal { iter =>
+ val proj = UnsafeProjection.create(schema)
+ iter.map(proj)
+ }
}
override def output: Seq[Attribute] = attributes
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf565b..44dc68e6ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,18 +28,17 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
-import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.{SparkException, TaskContext}
private[hive]
case class InsertIntoHiveTable(
@@ -101,15 +100,17 @@ case class InsertIntoHiveTable(
writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
+ val proj = FromUnsafeProjection(child.schema)
iterator.foreach { row =>
var i = 0
+ val safeRow = proj(row)
while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+ outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
i += 1
}
writerContainer
- .getLocalFileWriter(row, table.schema)
+ .getLocalFileWriter(safeRow, table.schema)
.write(serializer.serialize(outputData, standardOI))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index a61e162f48..6ccd417819 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -213,7 +213,8 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
if (iter.hasNext) {
- processIterator(iter)
+ val proj = UnsafeProjection.create(schema)
+ processIterator(iter).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 665e87e3e3..efbf9988dd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -689,36 +688,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
-
- test("HadoopFsRelation produces UnsafeRow") {
- withTempTable("test_unsafe") {
- withTempPath { dir =>
- val path = dir.getCanonicalPath
- sqlContext.range(3).write.format(dataSourceName).save(path)
- sqlContext.read
- .format(dataSourceName)
- .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
- .load(path)
- .registerTempTable("test_unsafe")
-
- val df = sqlContext.sql(
- """SELECT COUNT(*)
- |FROM test_unsafe a JOIN test_unsafe b
- |WHERE a.id = b.id
- """.stripMargin)
-
- val plan = df.queryExecution.executedPlan
-
- assert(
- plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
- s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
- |$plan
- """.stripMargin)
-
- checkAnswer(df, Row(3))
- }
- }
- }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when