From 26445c2e472bad137fd350e4089dd0ff43a42039 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 30 Mar 2016 18:21:06 -0700 Subject: [SPARK-14206][SQL] buildReader() implementation for CSV ## What changes were proposed in this pull request? Major changes: 1. Implement `FileFormat.buildReader()` for the CSV data source. 1. Add an extra argument to `FileFormat.buildReader()`, `physicalSchema`, which is basically the result of `FileFormat.inferSchema` or user specified schema. This argument is necessary because the CSV data source needs to know all the columns of the underlying files to read the file. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian Closes #12002 from liancheng/spark-14206-csv-build-reader. --- .../execution/datasources/FileSourceStrategy.scala | 16 +++---- .../execution/datasources/csv/CSVRelation.scala | 41 +++++++++++++---- .../execution/datasources/csv/DefaultSource.scala | 51 +++++++++++++++++++--- .../execution/datasources/json/JSONRelation.scala | 7 +-- .../datasources/parquet/ParquetRelation.scala | 26 +++-------- .../execution/datasources/text/DefaultSource.scala | 3 +- .../org/apache/spark/sql/sources/interfaces.scala | 18 +++++--- .../datasources/FileSourceStrategySuite.scala | 5 ++- .../apache/spark/sql/hive/orc/OrcRelation.scala | 15 ++++--- 9 files changed, 119 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index d6534083c0..554298772a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -59,8 +59,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { if (files.fileFormat.toString == "TestFileFormat" || files.fileFormat.isInstanceOf[parquet.DefaultSource] || files.fileFormat.toString == "ORC" || - files.fileFormat.isInstanceOf[json.DefaultSource] || - files.fileFormat.isInstanceOf[text.DefaultSource]) && + files.fileFormat.isInstanceOf[csv.DefaultSource] || + files.fileFormat.isInstanceOf[text.DefaultSource] || + files.fileFormat.isInstanceOf[json.DefaultSource]) && files.sqlContext.conf.useFileScan => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: @@ -80,14 +81,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val dataColumns = l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver) - val bucketColumns = - AttributeSet( - files.bucketSpec - .map(_.bucketColumnNames) - .getOrElse(Nil) - .map(l.resolveQuoted(_, files.sqlContext.conf.resolver) - .getOrElse(sys.error("")))) - // Partition keys are not available in the statistics of the files. val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty) @@ -113,8 +106,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val readFile = files.fileFormat.buildReader( sqlContext = files.sqlContext, + dataSchema = files.dataSchema, partitionSchema = files.partitionSchema, - dataSchema = prunedDataSchema, + requiredSchema = prunedDataSchema, filters = pushedDownFilters, options = files.options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 5501015775..b47328a3dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.csv import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.RecordWriter import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -49,14 +50,10 @@ object CSVRelation extends Logging { }, true) } - def parseCsv( - tokenizedRDD: RDD[Array[String]], + def csvParser( schema: StructType, requiredColumns: Array[String], - inputs: Seq[FileStatus], - sqlContext: SQLContext, - params: CSVOptions): RDD[InternalRow] = { - + params: CSVOptions): Array[String] => Option[InternalRow] = { val schemaFields = schema.fields val requiredFields = StructType(requiredColumns.map(schema(_))).fields val safeRequiredFields = if (params.dropMalformed) { @@ -74,7 +71,8 @@ object CSVRelation extends Logging { } val requiredSize = requiredFields.length val row = new GenericMutableRow(requiredSize) - tokenizedRDD.flatMap { tokens => + + (tokens: Array[String]) => { if (params.dropMalformed && schemaFields.length != tokens.length) { logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") None @@ -118,6 +116,33 @@ object CSVRelation extends Logging { } } } + + def parseCsv( + tokenizedRDD: RDD[Array[String]], + schema: StructType, + requiredColumns: Array[String], + options: CSVOptions): RDD[InternalRow] = { + val parser = csvParser(schema, requiredColumns, options) + tokenizedRDD.flatMap(parser(_).toSeq) + } + + // Skips the header line of each file if the `header` option is set to true. + def dropHeaderLine( + file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = { + // TODO What if the first partitioned file consists of only comments and empty lines? + if (csvOptions.headerFlag && file.start == 0) { + val nonEmptyLines = if (csvOptions.isCommentSet) { + val commentPrefix = csvOptions.comment.toString + lines.dropWhile { line => + line.trim.isEmpty || line.trim.startsWith(commentPrefix) + } + } else { + lines.dropWhile(_.trim.isEmpty) + } + + if (nonEmptyLines.hasNext) nonEmptyLines.drop(1) + } + } } private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 54e4c1a2c9..6b6add48cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -19,17 +19,19 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.{Charset, StandardCharsets} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat -import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection -import org.apache.spark.sql.execution.datasources.CompressionCodecs +import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration @@ -91,6 +93,46 @@ class DefaultSource extends FileFormat with DataSourceRegister { new CSVOutputWriterFactory(csvOptions) } + override def buildReader( + sqlContext: SQLContext, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + val csvOptions = new CSVOptions(options) + val headers = requiredSchema.fields.map(_.name) + + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + + (file: PartitionedFile) => { + val lineIterator = { + val conf = broadcastedConf.value.value + new HadoopFileLinesReader(file, conf).map { line => + new String(line.getBytes, 0, line.getLength, csvOptions.charset) + } + } + + CSVRelation.dropHeaderLine(file, lineIterator, csvOptions) + + val unsafeRowIterator = { + val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers) + val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions) + tokenizedIterator.flatMap(parser(_).toSeq) + } + + // Appends partition values + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) + + unsafeRowIterator.map { dataRow => + appendPartitionColumns(joinedRow(dataRow, file.partitionValues)) + } + } + } + /** * This supports to eliminate unneeded columns before producing an RDD * containing all of its tuples as Row objects. This reads all the tokens of each line @@ -113,8 +155,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { val pathsString = csvFiles.map(_.getPath.toUri.toString) val header = dataSchema.fields.map(_.name) val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString) - val rows = CSVRelation.parseCsv( - tokenizedRdd, dataSchema, requiredColumns, csvFiles, sqlContext, csvOptions) + val rows = CSVRelation.parseCsv(tokenizedRdd, dataSchema, requiredColumns, csvOptions) val requiredDataSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get)) rows.mapPartitions { iterator => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 21fc1224ef..42cd25a18c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -124,8 +124,9 @@ class DefaultSource extends FileFormat with DataSourceRegister { override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) @@ -136,7 +137,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) - val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() file => { @@ -144,7 +145,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { val rows = JacksonParser.parseJson( lines, - dataSchema, + requiredSchema, columnNameOfCorruptRecord, parsedOptions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index d6b84be267..5b58fa1fc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -276,38 +276,26 @@ private[sql] class DefaultSource file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } - /** - * Returns a function that can be used to read a single file in as an Iterator of InternalRow. - * - * @param partitionSchema The schema of the partition column row that will be present in each - * PartitionedFile. These columns should be prepended to the rows that - * are produced by the iterator. - * @param dataSchema The schema of the data that should be output for each row. This may be a - * subset of the columns that are present in the file if column pruning has - * occurred. - * @param filters A set of filters than can optionally be used to reduce the number of rows output - * @param options A set of string -> string configuration options. - * @return - */ override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) parquetConf.set( CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) + CatalystSchemaConverter.checkFieldNames(requiredSchema).json) parquetConf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) + CatalystSchemaConverter.checkFieldNames(requiredSchema).json) // We want to clear this temporary metadata from saving into Parquet file. // This metadata is only useful for detecting optional columns when pushdowning filters. val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, - dataSchema).asInstanceOf[StructType] + requiredSchema).asInstanceOf[StructType] CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf) // Sets flags for `CatalystSchemaConverter` @@ -324,7 +312,7 @@ private[sql] class DefaultSource // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .flatMap(ParquetFilters.createFilter(requiredSchema, _)) .reduceOption(FilterApi.and) } else { None @@ -394,7 +382,7 @@ private[sql] class DefaultSource enableVectorizedParquetReader) { iter.asInstanceOf[Iterator[InternalRow]] } else { - val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index d6ab5fc56e..99459ba1d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -129,8 +129,9 @@ class DefaultSource extends FileFormat with DataSourceRegister { override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 1e02354edf..6b95a3d25b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -385,9 +385,9 @@ abstract class OutputWriter { * * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise * this relation. - * @param partitionSchema The schmea of the columns (if any) that are used to partition the relation + * @param partitionSchema The schema of the columns (if any) that are used to partition the relation * @param dataSchema The schema of any remaining columns. Note that if any partition columns are - * present in the actual data files as well, they are removed. + * present in the actual data files as well, they are preserved. * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values). * @param fileFormat A file format that can be used to read and write the data in files. * @param options Configuration used when reading / writing data. @@ -462,20 +462,24 @@ trait FileFormat { /** * Returns a function that can be used to read a single file in as an Iterator of InternalRow. * + * @param dataSchema The global data schema. It can be either specified by the user, or + * reconciled/merged from all underlying data files. If any partition columns + * are contained in the files, they are preserved in this schema. * @param partitionSchema The schema of the partition column row that will be present in each - * PartitionedFile. These columns should be prepended to the rows that + * PartitionedFile. These columns should be appended to the rows that * are produced by the iterator. - * @param dataSchema The schema of the data that should be output for each row. This may be a - * subset of the columns that are present in the file if column pruning has - * occurred. + * @param requiredSchema The schema of the data that should be output for each row. This may be a + * subset of the columns that are present in the file if column pruning has + * occurred. * @param filters A set of filters than can optionally be used to reduce the number of rows output * @param options A set of string -> string configuration options. * @return */ def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { // TODO: Remove this default implementation when the other formats have been ported diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 45620bc965..717a3a80b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -376,14 +376,15 @@ class TestFileFormat extends FileFormat { override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { // Record the arguments so they can be checked in the test case. LastArguments.partitionSchema = partitionSchema - LastArguments.dataSchema = dataSchema + LastArguments.dataSchema = requiredSchema LastArguments.filters = filters LastArguments.options = options diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7c4a0a0c0f..43f445edcb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -126,8 +126,9 @@ private[sql] class DefaultSource override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) @@ -145,15 +146,15 @@ private[sql] class DefaultSource (file: PartitionedFile) => { val conf = broadcastedConf.value.value - // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this - // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty - // iterator. + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this + // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file + // using the given physical schema. Instead, we simply return an empty iterator. val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) if (maybePhysicalSchema.isEmpty) { Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema) + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -171,11 +172,11 @@ private[sql] class DefaultSource // Unwraps `OrcStruct`s to `UnsafeRow`s val unsafeRowIterator = OrcRelation.unwrapOrcStructs( - file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) + file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) ) // Appends partition values - val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) -- cgit v1.2.3