diff options
author | Reynold Xin <rxin@databricks.com> | 2016-01-01 19:23:06 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-01 19:23:06 -0800 |
commit | 44ee920fd49d35b421ae562ea99bcc8f2b98ced6 (patch) | |
tree | fe94b7a91dda2ef27d7be6507baa83a339050846 /sql/hive | |
parent | 0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e (diff) | |
download | spark-44ee920fd49d35b421ae562ea99bcc8f2b98ced6.tar.gz spark-44ee920fd49d35b421ae562ea99bcc8f2b98ced6.tar.bz2 spark-44ee920fd49d35b421ae562ea99bcc8f2b98ced6.zip |
Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow"
This reverts commit 0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.
Diffstat (limited to 'sql/hive')
4 files changed, 44 insertions, 21 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 1588728bdb..8141136de5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -132,17 +132,11 @@ case class HiveTableScan( } } - protected override def doExecute(): RDD[InternalRow] = { - val rdd = if (!relation.hiveQlTable.isPartitioned) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) - } else { - hadoopReader.makeRDDForPartitionedTable( - prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) - } - rdd.mapPartitionsInternal { iter => - val proj = UnsafeProjection.create(schema) - iter.map(proj) - } + protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) { + hadoopReader.makeRDDForTable(relation.hiveQlTable) + } else { + hadoopReader.makeRDDForPartitionedTable( + prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) } override def output: Seq[Attribute] = attributes diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 44dc68e6ba..f936cf565b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -28,17 +28,18 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} +import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute} -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.DataType -import org.apache.spark.util.SerializableJobConf import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.util.SerializableJobConf private[hive] case class InsertIntoHiveTable( @@ -100,17 +101,15 @@ case class InsertIntoHiveTable( writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - val proj = FromUnsafeProjection(child.schema) iterator.foreach { row => var i = 0 - val safeRow = proj(row) while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i))) + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) i += 1 } writerContainer - .getLocalFileWriter(safeRow, table.schema) + .getLocalFileWriter(row, table.schema) .write(serializer.serialize(outputData, standardOI)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 6ccd417819..a61e162f48 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -213,8 +213,7 @@ case class ScriptTransformation( child.execute().mapPartitions { iter => if (iter.hasNext) { - val proj = UnsafeProjection.create(schema) - processIterator(iter).map(proj) + processIterator(iter) } else { // If the input iterator has no rows then do not launch the external script. Iterator.empty diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index efbf9988dd..665e87e3e3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.ConvertToUnsafe import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -688,6 +689,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) } } + + test("HadoopFsRelation produces UnsafeRow") { + withTempTable("test_unsafe") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext.range(3).write.format(dataSourceName).save(path) + sqlContext.read + .format(dataSourceName) + .option("dataSchema", new StructType().add("id", LongType, nullable = false).json) + .load(path) + .registerTempTable("test_unsafe") + + val df = sqlContext.sql( + """SELECT COUNT(*) + |FROM test_unsafe a JOIN test_unsafe b + |WHERE a.id = b.id + """.stripMargin) + + val plan = df.queryExecution.executedPlan + + assert( + plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty, + s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s): + |$plan + """.stripMargin) + + checkAnswer(df, Row(3)) + } + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when |