aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-03 00:50:58 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-03 00:50:58 -0800
commitd556b317038455dc25e193f3add723fccdc54958 (patch)
tree0034ff1a21b42445d077507e3288c2bdeacb9617 /sql
parent982f3223b4f55f988091402063fe8746c5e2cee4 (diff)
downloadspark-d556b317038455dc25e193f3add723fccdc54958.tar.gz
spark-d556b317038455dc25e193f3add723fccdc54958.tar.bz2
spark-d556b317038455dc25e193f3add723fccdc54958.zip
[SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanup
## What changes were proposed in this pull request? This PR suggests adding some comments in `UnivocityParser` logics to explain what happens. Also, it proposes, IMHO, a little bit cleaner (at least easy for me to explain). ## How was this patch tested? Unit tests in `CSVSuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17142 from HyukjinKwon/SPARK-18699.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala97
1 files changed, 68 insertions, 29 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 804031a5bb..3b3b87e435 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -54,39 +54,77 @@ private[csv] class UnivocityParser(
private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
- private val valueConverters =
- dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
-
private val tokenizer = new CsvParser(options.asParserSettings)
private var numMalformedRecords = 0
private val row = new GenericInternalRow(requiredSchema.length)
- // This gets the raw input that is parsed lately.
+ // In `PERMISSIVE` parse mode, we should be able to put the raw malformed row into the field
+ // specified in `columnNameOfCorruptRecord`. The raw input is retrieved by this method.
private def getCurrentInput(): String = tokenizer.getContext.currentParsedContent().stripLineEnd
- // This parser loads an `indexArr._1`-th position value in input tokens,
- // then put the value in `row(indexArr._2)`.
- private val indexArr: Array[(Int, Int)] = {
- val fields = if (options.dropMalformed) {
- // If `dropMalformed` is enabled, then it needs to parse all the values
- // so that we can decide which row is malformed.
- requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
- } else {
- requiredSchema
- }
- // TODO: Revisit this; we need to clean up code here for readability.
- // See an URL below for related discussions:
- // https://github.com/apache/spark/pull/16928#discussion_r102636720
- val fieldsWithIndexes = fields.zipWithIndex
- corruptFieldIndex.map { case corrFieldIndex =>
- fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
- }.getOrElse {
- fieldsWithIndexes
- }.map { case (f, i) =>
- (dataSchema.indexOf(f), i)
- }.toArray
+ // This parser loads an `tokenIndexArr`-th position value in input tokens,
+ // then put the value in `row(rowIndexArr)`.
+ //
+ // For example, let's say there is CSV data as below:
+ //
+ // a,b,c
+ // 1,2,A
+ //
+ // Also, let's say `columnNameOfCorruptRecord` is set to "_unparsed", `header` is `true`
+ // by user and the user selects "c", "b", "_unparsed" and "a" fields. In this case, we need
+ // to map those values below:
+ //
+ // required schema - ["c", "b", "_unparsed", "a"]
+ // CSV data schema - ["a", "b", "c"]
+ // required CSV data schema - ["c", "b", "a"]
+ //
+ // with the input tokens,
+ //
+ // input tokens - [1, 2, "A"]
+ //
+ // Each input token is placed in each output row's position by mapping these. In this case,
+ //
+ // output row - ["A", 2, null, 1]
+ //
+ // In more details,
+ // - `valueConverters`, input tokens - CSV data schema
+ // `valueConverters` keeps the positions of input token indices (by its index) to each
+ // value's converter (by its value) in an order of CSV data schema. In this case,
+ // [string->int, string->int, string->string].
+ //
+ // - `tokenIndexArr`, input tokens - required CSV data schema
+ // `tokenIndexArr` keeps the positions of input token indices (by its index) to reordered
+ // fields given the required CSV data schema (by its value). In this case, [2, 1, 0].
+ //
+ // - `rowIndexArr`, input tokens - required schema
+ // `rowIndexArr` keeps the positions of input token indices (by its index) to reordered
+ // field indices given the required schema (by its value). In this case, [0, 1, 3].
+ private val valueConverters: Array[ValueConverter] =
+ dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
+
+ // Only used to create both `tokenIndexArr` and `rowIndexArr`. This variable means
+ // the fields that we should try to convert.
+ private val reorderedFields = if (options.dropMalformed) {
+ // If `dropMalformed` is enabled, then it needs to parse all the values
+ // so that we can decide which row is malformed.
+ requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
+ } else {
+ requiredSchema
+ }
+
+ private val tokenIndexArr: Array[Int] = {
+ reorderedFields
+ .filter(_.name != options.columnNameOfCorruptRecord)
+ .map(f => dataSchema.indexOf(f)).toArray
+ }
+
+ private val rowIndexArr: Array[Int] = if (corruptFieldIndex.isDefined) {
+ val corrFieldIndex = corruptFieldIndex.get
+ reorderedFields.indices.filter(_ != corrFieldIndex).toArray
+ } else {
+ reorderedFields.indices.toArray
}
/**
@@ -200,14 +238,15 @@ private[csv] class UnivocityParser(
private def convert(tokens: Array[String]): Option[InternalRow] = {
convertWithParseMode(tokens) { tokens =>
var i: Int = 0
- while (i < indexArr.length) {
- val (pos, rowIdx) = indexArr(i)
+ while (i < tokenIndexArr.length) {
// It anyway needs to try to parse since it decides if this row is malformed
// or not after trying to cast in `DROPMALFORMED` mode even if the casted
// value is not stored in the row.
- val value = valueConverters(pos).apply(tokens(pos))
+ val from = tokenIndexArr(i)
+ val to = rowIndexArr(i)
+ val value = valueConverters(from).apply(tokens(from))
if (i < requiredSchema.length) {
- row(rowIdx) = value
+ row(to) = value
}
i += 1
}