aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala30
1 files changed, 8 insertions, 22 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7c4a0a0c0f..21591ec093 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SQLContext}
@@ -45,7 +44,6 @@ import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-import org.apache.spark.util.collection.BitSet
private[sql] class DefaultSource
extends FileFormat with DataSourceRegister with Serializable {
@@ -111,23 +109,11 @@ private[sql] class DefaultSource
}
}
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
- OrcTableScan(sqlContext, output, filters, inputFiles).execute()
- }
-
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -145,15 +131,15 @@ private[sql] class DefaultSource
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
- // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
- // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
- // iterator.
+ // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
+ // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
+ // using the given physical schema. Instead, we simply return an empty iterator.
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
if (maybePhysicalSchema.isEmpty) {
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
- OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+ OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
val orcRecordReader = {
val job = Job.getInstance(conf)
@@ -171,11 +157,11 @@ private[sql] class DefaultSource
// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
- file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
+ file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)
// Appends partition values
- val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)