aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-05-04 14:16:57 +0800
committerCheng Lian <lian@databricks.com>2016-05-04 14:16:57 +0800
commitbc3760d405cc8c3ffcd957b188afa8b7e3b1f824 (patch)
treeaa6fae43f4eb0e9e88a0f2574bb2fa619954f98a /sql/hive
parent695f0e9195209c75bfc62fc70bfc6d7d9f1047b3 (diff)
downloadspark-bc3760d405cc8c3ffcd957b188afa8b7e3b1f824.tar.gz
spark-bc3760d405cc8c3ffcd957b188afa8b7e3b1f824.tar.bz2
spark-bc3760d405cc8c3ffcd957b188afa8b7e3b1f824.zip
[SPARK-14237][SQL] De-duplicate partition value appending logic in various buildReader() implementations
## What changes were proposed in this pull request? Currently, various `FileFormat` data sources share approximately the same code for partition value appending. This PR tries to eliminate this duplication. A new method `buildReaderWithPartitionValues()` is added to `FileFormat` with a default implementation that appends partition values to `InternalRow`s produced by the reader function returned by `buildReader()`. Special data sources like Parquet, which implements partition value appending inside `buildReader()` because of the vectorized reader, and the Text data source, which doesn't support partitioning, override `buildReaderWithPartitionValues()` and simply delegate to `buildReader()`. This PR brings two benefits: 1. Apparently, it de-duplicates partition value appending logic 2. Now the reader function returned by `buildReader()` is only required to produce `InternalRow`s rather than `UnsafeRow`s if the data source doesn't override `buildReaderWithPartitionValues()`. Because the safe-to-unsafe conversion is also performed while appending partition values. This makes 3rd-party data sources (e.g. spark-avro) easier to implement since they no longer need to access private APIs involving `UnsafeRow`. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <lian@databricks.com> Closes #12866 from liancheng/spark-14237-simplify-partition-values-appending.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala11
1 files changed, 1 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index d6a847f3ba..89d258e844 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -157,20 +157,11 @@ private[sql] class DefaultSource
}
// Unwraps `OrcStruct`s to `UnsafeRow`s
- val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
+ OrcRelation.unwrapOrcStructs(
conf,
requiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
new RecordReaderIterator[OrcStruct](orcRecordReader))
-
- // Appends partition values
- val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
- val joinedRow = new JoinedRow()
- val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
-
- unsafeRowIterator.map { dataRow =>
- appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
- }
}
}
}