diff options
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 30 |
1 files changed, 8 insertions, 22 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7c4a0a0c0f..21591ec093 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.{Row, SQLContext} @@ -45,7 +44,6 @@ import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -import org.apache.spark.util.collection.BitSet private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Serializable { @@ -111,23 +109,11 @@ private[sql] class DefaultSource } } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes - OrcTableScan(sqlContext, output, filters, inputFiles).execute() - } - override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) @@ -145,15 +131,15 @@ private[sql] class DefaultSource (file: PartitionedFile) => { val conf = broadcastedConf.value.value - // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this - // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty - // iterator. + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this + // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file + // using the given physical schema. Instead, we simply return an empty iterator. val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) if (maybePhysicalSchema.isEmpty) { Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -171,11 +157,11 @@ private[sql] class DefaultSource // Unwraps `OrcStruct`s to `UnsafeRow`s val unsafeRowIterator = OrcRelation.unwrapOrcStructs( - file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) + file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) ) // Appends partition values - val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) |