From aa494a9c2ebd59baec47beb434cd09bf3f188218 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 31 Oct 2015 21:16:09 -0700 Subject: [SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation data sources produce UnsafeRow This PR fixes two issues: 1. `PhysicalRDD.outputsUnsafeRows` is always `false` Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`. 1. Internal/external row conversion for `HadoopFsRelation` is kinda messy Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary. Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`. This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s). All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s. In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`. A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows? At least all well known ones do so. However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations. If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]). This PR supersedes #9125. Follow-ups: 1. Makes JSON and ORC data sources output `UnsafeRow` directly 1. Makes `HiveTableScan` output `UnsafeRow` directly This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`. [1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353 [2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335 [3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669 Author: Cheng Lian Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation. --- .../apache/spark/sql/hive/orc/OrcRelation.scala | 30 +++++++++------------ .../spark/sql/sources/hadoopFsRelationSuites.scala | 31 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 17 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index d1f30e188e..45de567039 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc import java.util.Properties -import scala.collection.JavaConverters._ - import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct} import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector -import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo} +import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.Logging +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.{HadoopRDD, RDD} @@ -199,12 +198,13 @@ private[sql] class OrcRelation( partitionColumns) } - override def buildScan( + override private[sql] def buildInternalScan( requiredColumns: Array[String], filters: Array[Filter], - inputPaths: Array[FileStatus]): RDD[Row] = { + inputPaths: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes - OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]] + OrcTableScan(output, this, filters, inputPaths).execute() } override def prepareJobForWrite(job: Job): OutputWriterFactory = { @@ -253,16 +253,17 @@ private[orc] case class OrcTableScan( path: String, conf: Configuration, iterator: Iterator[Writable], - nonPartitionKeyAttrs: Seq[(Attribute, Int)], - mutableRow: MutableRow): Iterator[InternalRow] = { + nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = { val deserializer = new OrcSerde val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf)) + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs)) // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty // partition since we know that this file is empty. maybeStructOI.map { soi => - val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { + val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map { case (attr, ordinal) => soi.getStructFieldRef(attr.name) -> ordinal }.unzip @@ -280,7 +281,7 @@ private[orc] case class OrcTableScan( } i += 1 } - mutableRow: InternalRow + unsafeProjection(mutableRow) } }.getOrElse { Iterator.empty @@ -322,13 +323,8 @@ private[orc] case class OrcTableScan( val wrappedConf = new SerializableConfiguration(conf) rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) - fillObject( - split.getPath.toString, - wrappedConf.value, - iterator.map(_._2), - attributes.zipWithIndex, - mutableRow) + val writableIterator = iterator.map(_._2) + fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index e3605bb3f6..100b97137c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.ConvertToUnsafe import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) } } + + test("HadoopFsRelation produces UnsafeRow") { + withTempTable("test_unsafe") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext.range(3).write.format(dataSourceName).save(path) + sqlContext.read + .format(dataSourceName) + .option("dataSchema", new StructType().add("id", LongType, nullable = false).json) + .load(path) + .registerTempTable("test_unsafe") + + val df = sqlContext.sql( + """SELECT COUNT(*) + |FROM test_unsafe a JOIN test_unsafe b + |WHERE a.id = b.id + """.stripMargin) + + val plan = df.queryExecution.executedPlan + + assert( + plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty, + s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s): + |$plan + """.stripMargin) + + checkAnswer(df, Row(3)) + } + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when -- cgit v1.2.3