aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-30 18:21:06 -0700
committerYin Huai <yhuai@databricks.com>2016-03-30 18:21:06 -0700
commit26445c2e472bad137fd350e4089dd0ff43a42039 (patch)
tree7972c24c16fef4202224d9982edb6698ece7e589 /sql/hive/src/main
parentda54abfd8730ef752eca921089bcf568773bd24a (diff)
downloadspark-26445c2e472bad137fd350e4089dd0ff43a42039.tar.gz
spark-26445c2e472bad137fd350e4089dd0ff43a42039.tar.bz2
spark-26445c2e472bad137fd350e4089dd0ff43a42039.zip
[SPARK-14206][SQL] buildReader() implementation for CSV
## What changes were proposed in this pull request? Major changes: 1. Implement `FileFormat.buildReader()` for the CSV data source. 1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema. This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <lian@databricks.com> Closes #12002 from liancheng/spark-14206-csv-build-reader.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7c4a0a0c0f..43f445edcb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -126,8 +126,9 @@ private[sql] class DefaultSource
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -145,15 +146,15 @@ private[sql] class DefaultSource
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
- // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
- // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
- // iterator.
+ // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
+ // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
+ // using the given physical schema. Instead, we simply return an empty iterator.
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
if (maybePhysicalSchema.isEmpty) {
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
- OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+ OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
val orcRecordReader = {
val job = Job.getInstance(conf)
@@ -171,11 +172,11 @@ private[sql] class DefaultSource
// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
- file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
+ file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)
// Appends partition values
- val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)