diff options
author | Reynold Xin <rxin@databricks.com> | 2016-06-20 21:46:12 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-06-20 21:46:12 -0700 |
commit | c775bf09e0c3540f76de3f15d3fd35112a4912c1 (patch) | |
tree | 7a778d6821ffc5555779598fa9dae0c812229f5e /sql | |
parent | 217db56ba11fcdf9e3a81946667d1d99ad7344ee (diff) | |
download | spark-c775bf09e0c3540f76de3f15d3fd35112a4912c1.tar.gz spark-c775bf09e0c3540f76de3f15d3fd35112a4912c1.tar.bz2 spark-c775bf09e0c3540f76de3f15d3fd35112a4912c1.zip |
[SPARK-13792][SQL] Limit logging of bad records in CSV data source
## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records.
The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged.
```
Closes #12173
## How was this patch tested?
Manually tested.
Author: Reynold Xin <rxin@databricks.com>
Closes #13795 from rxin/SPARK-13792.
Diffstat (limited to 'sql')
4 files changed, 40 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 841503b260..35ba9c5079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have.</li> * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed * for any given value being read.</li> + * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows + * Spark will log for each partition. Malformed records beyond this number will be ignored.</li> * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing.</li> * <ul> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index be52de8e40..12e19f955c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -120,7 +120,14 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers) val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions) - tokenizedIterator.flatMap(parser(_).toSeq) + var numMalformedRecords = 0 + tokenizedIterator.flatMap { recordTokens => + val row = parser(recordTokens, numMalformedRecords) + if (row.isEmpty) { + numMalformedRecords += 1 + } + row + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 9f4ce8358b..581eda7e09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -113,6 +113,8 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str val escapeQuotes = getBool("escapeQuotes", true) + val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10) + val inputBufferSize = 128 val isCommentSet = this.comment != '\u0000' diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index d72c8b9ac2..083ac3350e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -50,10 +50,19 @@ object CSVRelation extends Logging { } } + /** + * Returns a function that parses a single CSV record (in the form of an array of strings in which + * each element represents a column) and turns it into either one resulting row or no row (if the + * the record is malformed). + * + * The 2nd argument in the returned function represents the total number of malformed rows + * observed so far. + */ + // This is pretty convoluted and we should probably rewrite the entire CSV parsing soon. def csvParser( schema: StructType, requiredColumns: Array[String], - params: CSVOptions): Array[String] => Option[InternalRow] = { + params: CSVOptions): (Array[String], Int) => Option[InternalRow] = { val schemaFields = schema.fields val requiredFields = StructType(requiredColumns.map(schema(_))).fields val safeRequiredFields = if (params.dropMalformed) { @@ -72,9 +81,16 @@ object CSVRelation extends Logging { val requiredSize = requiredFields.length val row = new GenericMutableRow(requiredSize) - (tokens: Array[String]) => { + (tokens: Array[String], numMalformedRows) => { if (params.dropMalformed && schemaFields.length != tokens.length) { - logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + if (numMalformedRows < params.maxMalformedLogPerPartition) { + logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + } + if (numMalformedRows == params.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${params.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } None } else if (params.failFast && schemaFields.length != tokens.length) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + @@ -109,23 +125,21 @@ object CSVRelation extends Logging { Some(row) } catch { case NonFatal(e) if params.dropMalformed => - logWarning("Parse exception. " + - s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + if (numMalformedRows < params.maxMalformedLogPerPartition) { + logWarning("Parse exception. " + + s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") + } + if (numMalformedRows == params.maxMalformedLogPerPartition - 1) { + logWarning( + s"More than ${params.maxMalformedLogPerPartition} malformed records have been " + + "found on this partition. Malformed records from now on will not be logged.") + } None } } } } - def parseCsv( - tokenizedRDD: RDD[Array[String]], - schema: StructType, - requiredColumns: Array[String], - options: CSVOptions): RDD[InternalRow] = { - val parser = csvParser(schema, requiredColumns, options) - tokenizedRDD.flatMap(parser(_).toSeq) - } - // Skips the header line of each file if the `header` option is set to true. def dropHeaderLine( file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = { |