From 26445c2e472bad137fd350e4089dd0ff43a42039 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 30 Mar 2016 18:21:06 -0700 Subject: [SPARK-14206][SQL] buildReader() implementation for CSV ## What changes were proposed in this pull request? Major changes: 1. Implement `FileFormat.buildReader()` for the CSV data source. 1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema. This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian Closes #12002 from liancheng/spark-14206-csv-build-reader. --- .../scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'sql/hive/src/main') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7c4a0a0c0f..43f445edcb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -126,8 +126,9 @@ private[sql] class DefaultSource override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) @@ -145,15 +146,15 @@ private[sql] class DefaultSource (file: PartitionedFile) => { val conf = broadcastedConf.value.value - // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this - // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty - // iterator. + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this + // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file + // using the given physical schema. Instead, we simply return an empty iterator. val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) if (maybePhysicalSchema.isEmpty) { Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -171,11 +172,11 @@ private[sql] class DefaultSource // Unwraps `OrcStruct`s to `UnsafeRow`s val unsafeRowIterator = OrcRelation.unwrapOrcStructs( - file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) + file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) ) // Appends partition values - val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) -- cgit v1.2.3