diff options
author | Cheng Lian <lian@databricks.com> | 2016-05-04 14:16:57 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-05-04 14:16:57 +0800 |
commit | bc3760d405cc8c3ffcd957b188afa8b7e3b1f824 (patch) | |
tree | aa6fae43f4eb0e9e88a0f2574bb2fa619954f98a /sql/hive | |
parent | 695f0e9195209c75bfc62fc70bfc6d7d9f1047b3 (diff) | |
download | spark-bc3760d405cc8c3ffcd957b188afa8b7e3b1f824.tar.gz spark-bc3760d405cc8c3ffcd957b188afa8b7e3b1f824.tar.bz2 spark-bc3760d405cc8c3ffcd957b188afa8b7e3b1f824.zip |
[SPARK-14237][SQL] De-duplicate partition value appending logic in various buildReader() implementations
## What changes were proposed in this pull request?
Currently, various `FileFormat` data sources share approximately the same code for partition value appending. This PR tries to eliminate this duplication.
A new method `buildReaderWithPartitionValues()` is added to `FileFormat` with a default implementation that appends partition values to `InternalRow`s produced by the reader function returned by `buildReader()`.
Special data sources like Parquet, which implements partition value appending inside `buildReader()` because of the vectorized reader, and the Text data source, which doesn't support partitioning, override `buildReaderWithPartitionValues()` and simply delegate to `buildReader()`.
This PR brings two benefits:
1. Apparently, it de-duplicates partition value appending logic
2. Now the reader function returned by `buildReader()` is only required to produce `InternalRow`s rather than `UnsafeRow`s if the data source doesn't override `buildReaderWithPartitionValues()`.
Because the safe-to-unsafe conversion is also performed while appending partition values. This makes 3rd-party data sources (e.g. spark-avro) easier to implement since they no longer need to access private APIs involving `UnsafeRow`.
## How was this patch tested?
Existing tests should do the work.
Author: Cheng Lian <lian@databricks.com>
Closes #12866 from liancheng/spark-14237-simplify-partition-values-appending.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 11 |
1 files changed, 1 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index d6a847f3ba..89d258e844 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -157,20 +157,11 @@ private[sql] class DefaultSource } // Unwraps `OrcStruct`s to `UnsafeRow`s - val unsafeRowIterator = OrcRelation.unwrapOrcStructs( + OrcRelation.unwrapOrcStructs( conf, requiredSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), new RecordReaderIterator[OrcStruct](orcRecordReader)) - - // Appends partition values - val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val joinedRow = new JoinedRow() - val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) - - unsafeRowIterator.map { dataRow => - appendPartitionColumns(joinedRow(dataRow, file.partitionValues)) - } } } } |