aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-30 18:21:06 -0700
committerYin Huai <yhuai@databricks.com>2016-03-30 18:21:06 -0700
commit26445c2e472bad137fd350e4089dd0ff43a42039 (patch)
tree7972c24c16fef4202224d9982edb6698ece7e589
parentda54abfd8730ef752eca921089bcf568773bd24a (diff)
downloadspark-26445c2e472bad137fd350e4089dd0ff43a42039.tar.gz
spark-26445c2e472bad137fd350e4089dd0ff43a42039.tar.bz2
spark-26445c2e472bad137fd350e4089dd0ff43a42039.zip
[SPARK-14206][SQL] buildReader() implementation for CSV
## What changes were proposed in this pull request? Major changes: 1. Implement `FileFormat.buildReader()` for the CSV data source. 1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema. This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <lian@databricks.com> Closes #12002 from liancheng/spark-14206-csv-build-reader.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala15
9 files changed, 119 insertions, 63 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index d6534083c0..554298772a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,8 +59,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC" ||
- files.fileFormat.isInstanceOf[json.DefaultSource] ||
- files.fileFormat.isInstanceOf[text.DefaultSource]) &&
+ files.fileFormat.isInstanceOf[csv.DefaultSource] ||
+ files.fileFormat.isInstanceOf[text.DefaultSource] ||
+ files.fileFormat.isInstanceOf[json.DefaultSource]) &&
files.sqlContext.conf.useFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
@@ -80,14 +81,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
- val bucketColumns =
- AttributeSet(
- files.bucketSpec
- .map(_.bucketColumnNames)
- .getOrElse(Nil)
- .map(l.resolveQuoted(_, files.sqlContext.conf.resolver)
- .getOrElse(sys.error(""))))
-
// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
@@ -113,8 +106,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val readFile = files.fileFormat.buildReader(
sqlContext = files.sqlContext,
+ dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
- dataSchema = prunedDataSchema,
+ requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 5501015775..b47328a3dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv
import scala.util.control.NonFatal
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.RecordWriter
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -49,14 +50,10 @@ object CSVRelation extends Logging {
}, true)
}
- def parseCsv(
- tokenizedRDD: RDD[Array[String]],
+ def csvParser(
schema: StructType,
requiredColumns: Array[String],
- inputs: Seq[FileStatus],
- sqlContext: SQLContext,
- params: CSVOptions): RDD[InternalRow] = {
-
+ params: CSVOptions): Array[String] => Option[InternalRow] = {
val schemaFields = schema.fields
val requiredFields = StructType(requiredColumns.map(schema(_))).fields
val safeRequiredFields = if (params.dropMalformed) {
@@ -74,7 +71,8 @@ object CSVRelation extends Logging {
}
val requiredSize = requiredFields.length
val row = new GenericMutableRow(requiredSize)
- tokenizedRDD.flatMap { tokens =>
+
+ (tokens: Array[String]) => {
if (params.dropMalformed && schemaFields.length != tokens.length) {
logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
None
@@ -118,6 +116,33 @@ object CSVRelation extends Logging {
}
}
}
+
+ def parseCsv(
+ tokenizedRDD: RDD[Array[String]],
+ schema: StructType,
+ requiredColumns: Array[String],
+ options: CSVOptions): RDD[InternalRow] = {
+ val parser = csvParser(schema, requiredColumns, options)
+ tokenizedRDD.flatMap(parser(_).toSeq)
+ }
+
+ // Skips the header line of each file if the `header` option is set to true.
+ def dropHeaderLine(
+ file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {
+ // TODO What if the first partitioned file consists of only comments and empty lines?
+ if (csvOptions.headerFlag && file.start == 0) {
+ val nonEmptyLines = if (csvOptions.isCommentSet) {
+ val commentPrefix = csvOptions.comment.toString
+ lines.dropWhile { line =>
+ line.trim.isEmpty || line.trim.startsWith(commentPrefix)
+ }
+ } else {
+ lines.dropWhile(_.trim.isEmpty)
+ }
+
+ if (nonEmptyLines.hasNext) nonEmptyLines.drop(1)
+ }
+ }
}
private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 54e4c1a2c9..6b6add48cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -19,17 +19,19 @@ package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.{Charset, StandardCharsets}
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -91,6 +93,46 @@ class DefaultSource extends FileFormat with DataSourceRegister {
new CSVOutputWriterFactory(csvOptions)
}
+ override def buildReader(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ val csvOptions = new CSVOptions(options)
+ val headers = requiredSchema.fields.map(_.name)
+
+ val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+ (file: PartitionedFile) => {
+ val lineIterator = {
+ val conf = broadcastedConf.value.value
+ new HadoopFileLinesReader(file, conf).map { line =>
+ new String(line.getBytes, 0, line.getLength, csvOptions.charset)
+ }
+ }
+
+ CSVRelation.dropHeaderLine(file, lineIterator, csvOptions)
+
+ val unsafeRowIterator = {
+ val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
+ val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
+ tokenizedIterator.flatMap(parser(_).toSeq)
+ }
+
+ // Appends partition values
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+ unsafeRowIterator.map { dataRow =>
+ appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+ }
+ }
+ }
+
/**
* This supports to eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects. This reads all the tokens of each line
@@ -113,8 +155,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val pathsString = csvFiles.map(_.getPath.toUri.toString)
val header = dataSchema.fields.map(_.name)
val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString)
- val rows = CSVRelation.parseCsv(
- tokenizedRdd, dataSchema, requiredColumns, csvFiles, sqlContext, csvOptions)
+ val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions)
val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get))
rows.mapPartitions { iterator =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 21fc1224ef..42cd25a18c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -124,8 +124,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -136,7 +137,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
- val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
file => {
@@ -144,7 +145,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val rows = JacksonParser.parseJson(
lines,
- dataSchema,
+ requiredSchema,
columnNameOfCorruptRecord,
parsedOptions)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index d6b84be267..5b58fa1fc5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -276,38 +276,26 @@ private[sql] class DefaultSource
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
- /**
- * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
- *
- * @param partitionSchema The schema of the partition column row that will be present in each
- * PartitionedFile. These columns should be prepended to the rows that
- * are produced by the iterator.
- * @param dataSchema The schema of the data that should be output for each row. This may be a
- * subset of the columns that are present in the file if column pruning has
- * occurred.
- * @param filters A set of filters than can optionally be used to reduce the number of rows output
- * @param options A set of string -> string configuration options.
- * @return
- */
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
parquetConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+ CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
parquetConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+ CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
- dataSchema).asInstanceOf[StructType]
+ requiredSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
// Sets flags for `CatalystSchemaConverter`
@@ -324,7 +312,7 @@ private[sql] class DefaultSource
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .flatMap(ParquetFilters.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
@@ -394,7 +382,7 @@ private[sql] class DefaultSource
enableVectorizedParquetReader) {
iter.asInstanceOf[Iterator[InternalRow]]
} else {
- val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index d6ab5fc56e..99459ba1d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -129,8 +129,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 1e02354edf..6b95a3d25b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -385,9 +385,9 @@ abstract class OutputWriter {
*
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
* this relation.
- * @param partitionSchema The schmea of the columns (if any) that are used to partition the relation
+ * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
- * present in the actual data files as well, they are removed.
+ * present in the actual data files as well, they are preserved.
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
* @param fileFormat A file format that can be used to read and write the data in files.
* @param options Configuration used when reading / writing data.
@@ -462,20 +462,24 @@ trait FileFormat {
/**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
+ * @param dataSchema The global data schema. It can be either specified by the user, or
+ * reconciled/merged from all underlying data files. If any partition columns
+ * are contained in the files, they are preserved in this schema.
* @param partitionSchema The schema of the partition column row that will be present in each
- * PartitionedFile. These columns should be prepended to the rows that
+ * PartitionedFile. These columns should be appended to the rows that
* are produced by the iterator.
- * @param dataSchema The schema of the data that should be output for each row. This may be a
- * subset of the columns that are present in the file if column pruning has
- * occurred.
+ * @param requiredSchema The schema of the data that should be output for each row. This may be a
+ * subset of the columns that are present in the file if column pruning has
+ * occurred.
* @param filters A set of filters than can optionally be used to reduce the number of rows output
* @param options A set of string -> string configuration options.
* @return
*/
def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
// TODO: Remove this default implementation when the other formats have been ported
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45620bc965..717a3a80b7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -376,14 +376,15 @@ class TestFileFormat extends FileFormat {
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
// Record the arguments so they can be checked in the test case.
LastArguments.partitionSchema = partitionSchema
- LastArguments.dataSchema = dataSchema
+ LastArguments.dataSchema = requiredSchema
LastArguments.filters = filters
LastArguments.options = options
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7c4a0a0c0f..43f445edcb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -126,8 +126,9 @@ private[sql] class DefaultSource
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -145,15 +146,15 @@ private[sql] class DefaultSource
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
- // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
- // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
- // iterator.
+ // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
+ // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
+ // using the given physical schema. Instead, we simply return an empty iterator.
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
if (maybePhysicalSchema.isEmpty) {
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
- OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+ OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
val orcRecordReader = {
val job = Job.getInstance(conf)
@@ -171,11 +172,11 @@ private[sql] class DefaultSource
// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
- file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
+ file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)
// Appends partition values
- val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)