diff options
author | Cheng Lian <lian@databricks.com> | 2016-03-30 18:21:06 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-30 18:21:06 -0700 |
commit | 26445c2e472bad137fd350e4089dd0ff43a42039 (patch) | |
tree | 7972c24c16fef4202224d9982edb6698ece7e589 /sql/hive | |
parent | da54abfd8730ef752eca921089bcf568773bd24a (diff) | |
download | spark-26445c2e472bad137fd350e4089dd0ff43a42039.tar.gz spark-26445c2e472bad137fd350e4089dd0ff43a42039.tar.bz2 spark-26445c2e472bad137fd350e4089dd0ff43a42039.zip |
[SPARK-14206][SQL] buildReader() implementation for CSV
## What changes were proposed in this pull request?
Major changes:
1. Implement `FileFormat.buildReader()` for the CSV data source.
1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema.
This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file.
## How was this patch tested?
Existing tests should do the work.
Author: Cheng Lian <lian@databricks.com>
Closes #12002 from liancheng/spark-14206-csv-build-reader.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7c4a0a0c0f..43f445edcb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -126,8 +126,9 @@ private[sql] class DefaultSource override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) @@ -145,15 +146,15 @@ private[sql] class DefaultSource (file: PartitionedFile) => { val conf = broadcastedConf.value.value - // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this - // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty - // iterator. + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this + // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file + // using the given physical schema. Instead, we simply return an empty iterator. val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) if (maybePhysicalSchema.isEmpty) { Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -171,11 +172,11 @@ private[sql] class DefaultSource // Unwraps `OrcStruct`s to `UnsafeRow`s val unsafeRowIterator = OrcRelation.unwrapOrcStructs( - file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) + file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) ) // Appends partition values - val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) |