diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 30 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 31 |
2 files changed, 44 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index d1f30e188e..45de567039 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc import java.util.Properties -import scala.collection.JavaConverters._ - import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct} import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector -import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo} +import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.Logging +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.{HadoopRDD, RDD} @@ -199,12 +198,13 @@ private[sql] class OrcRelation( partitionColumns) } - override def buildScan( + override private[sql] def buildInternalScan( requiredColumns: Array[String], filters: Array[Filter], - inputPaths: Array[FileStatus]): RDD[Row] = { + inputPaths: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes - OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]] + OrcTableScan(output, this, filters, inputPaths).execute() } override def prepareJobForWrite(job: Job): OutputWriterFactory = { @@ -253,16 +253,17 @@ private[orc] case class OrcTableScan( path: String, conf: Configuration, iterator: Iterator[Writable], - nonPartitionKeyAttrs: Seq[(Attribute, Int)], - mutableRow: MutableRow): Iterator[InternalRow] = { + nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = { val deserializer = new OrcSerde val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf)) + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs)) // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty // partition since we know that this file is empty. maybeStructOI.map { soi => - val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { + val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map { case (attr, ordinal) => soi.getStructFieldRef(attr.name) -> ordinal }.unzip @@ -280,7 +281,7 @@ private[orc] case class OrcTableScan( } i += 1 } - mutableRow: InternalRow + unsafeProjection(mutableRow) } }.getOrElse { Iterator.empty @@ -322,13 +323,8 @@ private[orc] case class OrcTableScan( val wrappedConf = new SerializableConfiguration(conf) rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) - fillObject( - split.getPath.toString, - wrappedConf.value, - iterator.map(_._2), - attributes.zipWithIndex, - mutableRow) + val writableIterator = iterator.map(_._2) + fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index e3605bb3f6..100b97137c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.ConvertToUnsafe import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) } } + + test("HadoopFsRelation produces UnsafeRow") { + withTempTable("test_unsafe") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext.range(3).write.format(dataSourceName).save(path) + sqlContext.read + .format(dataSourceName) + .option("dataSchema", new StructType().add("id", LongType, nullable = false).json) + .load(path) + .registerTempTable("test_unsafe") + + val df = sqlContext.sql( + """SELECT COUNT(*) + |FROM test_unsafe a JOIN test_unsafe b + |WHERE a.id = b.id + """.stripMargin) + + val plan = df.queryExecution.executedPlan + + assert( + plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty, + s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s): + |$plan + """.stripMargin) + + checkAnswer(df, Row(3)) + } + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when |