aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-06-20 21:46:12 -0700
committerReynold Xin <rxin@databricks.com>2016-06-20 21:46:12 -0700
commitc775bf09e0c3540f76de3f15d3fd35112a4912c1 (patch)
tree7a778d6821ffc5555779598fa9dae0c812229f5e /sql
parent217db56ba11fcdf9e3a81946667d1d99ad7344ee (diff)
downloadspark-c775bf09e0c3540f76de3f15d3fd35112a4912c1.tar.gz
spark-c775bf09e0c3540f76de3f15d3fd35112a4912c1.tar.bz2
spark-c775bf09e0c3540f76de3f15d3fd35112a4912c1.zip
[SPARK-13792][SQL] Limit logging of bad records in CSV data source
## What changes were proposed in this pull request? This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records. The error log looks something like ``` 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4 16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged. ``` Closes #12173 ## How was this patch tested? Manually tested. Author: Reynold Xin <rxin@databricks.com> Closes #13795 from rxin/SPARK-13792.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala42
4 files changed, 40 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 841503b260..35ba9c5079 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
* for any given value being read.</li>
+ * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows
+ * Spark will log for each partition. Malformed records beyond this number will be ignored.</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.</li>
* <ul>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index be52de8e40..12e19f955c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -120,7 +120,14 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
- tokenizedIterator.flatMap(parser(_).toSeq)
+ var numMalformedRecords = 0
+ tokenizedIterator.flatMap { recordTokens =>
+ val row = parser(recordTokens, numMalformedRecords)
+ if (row.isEmpty) {
+ numMalformedRecords += 1
+ }
+ row
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9f4ce8358b..581eda7e09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -113,6 +113,8 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
val escapeQuotes = getBool("escapeQuotes", true)
+ val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)
+
val inputBufferSize = 128
val isCommentSet = this.comment != '\u0000'
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index d72c8b9ac2..083ac3350e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -50,10 +50,19 @@ object CSVRelation extends Logging {
}
}
+ /**
+ * Returns a function that parses a single CSV record (in the form of an array of strings in which
+ * each element represents a column) and turns it into either one resulting row or no row (if the
+ * the record is malformed).
+ *
+ * The 2nd argument in the returned function represents the total number of malformed rows
+ * observed so far.
+ */
+ // This is pretty convoluted and we should probably rewrite the entire CSV parsing soon.
def csvParser(
schema: StructType,
requiredColumns: Array[String],
- params: CSVOptions): Array[String] => Option[InternalRow] = {
+ params: CSVOptions): (Array[String], Int) => Option[InternalRow] = {
val schemaFields = schema.fields
val requiredFields = StructType(requiredColumns.map(schema(_))).fields
val safeRequiredFields = if (params.dropMalformed) {
@@ -72,9 +81,16 @@ object CSVRelation extends Logging {
val requiredSize = requiredFields.length
val row = new GenericMutableRow(requiredSize)
- (tokens: Array[String]) => {
+ (tokens: Array[String], numMalformedRows) => {
if (params.dropMalformed && schemaFields.length != tokens.length) {
- logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+ if (numMalformedRows < params.maxMalformedLogPerPartition) {
+ logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+ }
+ if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
+ logWarning(
+ s"More than ${params.maxMalformedLogPerPartition} malformed records have been " +
+ "found on this partition. Malformed records from now on will not be logged.")
+ }
None
} else if (params.failFast && schemaFields.length != tokens.length) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
@@ -109,23 +125,21 @@ object CSVRelation extends Logging {
Some(row)
} catch {
case NonFatal(e) if params.dropMalformed =>
- logWarning("Parse exception. " +
- s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+ if (numMalformedRows < params.maxMalformedLogPerPartition) {
+ logWarning("Parse exception. " +
+ s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
+ }
+ if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
+ logWarning(
+ s"More than ${params.maxMalformedLogPerPartition} malformed records have been " +
+ "found on this partition. Malformed records from now on will not be logged.")
+ }
None
}
}
}
}
- def parseCsv(
- tokenizedRDD: RDD[Array[String]],
- schema: StructType,
- requiredColumns: Array[String],
- options: CSVOptions): RDD[InternalRow] = {
- val parser = csvParser(schema, requiredColumns, options)
- tokenizedRDD.flatMap(parser(_).toSeq)
- }
-
// Skips the header line of each file if the `header` option is set to true.
def dropHeaderLine(
file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {