diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2017-03-22 09:52:37 -0700 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-03-22 09:52:37 -0700 |
commit | 465818389aab1217c9de5c685cfaee3ffaec91bb (patch) | |
tree | 54691a40b9b00854f5c6fc343c0186c7bc214f22 /sql/core/src/main/scala/org/apache | |
parent | 0caade634076034182e22318eb09a6df1c560576 (diff) | |
download | spark-465818389aab1217c9de5c685cfaee3ffaec91bb.tar.gz spark-465818389aab1217c9de5c685cfaee3ffaec91bb.tar.bz2 spark-465818389aab1217c9de5c685cfaee3ffaec91bb.zip |
[SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related comments
## What changes were proposed in this pull request?
This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix.
Also, this PR modifies some tests related parse modes.
## How was this patch tested?
Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #17377 from HyukjinKwon/SPARK-19949.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
4 files changed, 6 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 767a636d70..e39b4d91f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -510,10 +510,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have.</li> * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length</li> - * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows - * Spark will log for each partition. Malformed records beyond this number will be ignored.</li> * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing. + * during parsing. It supports the following case-insensitive modes. * <ul> * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index f6c6b6f56c..5d2c23ed96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -82,7 +82,8 @@ class CSVOptions( val delimiter = CSVUtils.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val parseMode: ParseMode = + parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) val charset = parameters.getOrElse("encoding", parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) @@ -95,15 +96,6 @@ class CSVOptions( val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace") val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace") - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { - logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - val failFast = ParseModes.isFailFastMode(parseMode) - val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - val permissive = ParseModes.isPermissiveMode(parseMode) - val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) @@ -139,8 +131,6 @@ class CSVOptions( val escapeQuotes = getBool("escapeQuotes", true) - val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10) - val quoteAll = getBool("quoteAll", false) val inputBufferSize = 128 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala index 7475f8ec79..e15c30b437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.json.JSONOptions +import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -40,7 +41,7 @@ private[sql] object JsonInferSchema { json: RDD[T], configOptions: JSONOptions, createParser: (JsonFactory, T) => JsonParser): StructType = { - val shouldHandleCorruptRecord = configOptions.permissive + val shouldHandleCorruptRecord = configOptions.parseMode == PermissiveMode val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord // perform schema inference on each row and merge afterwards diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 388ef182ce..f6e2fef74b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -260,7 +260,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length</li> * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing. + * during parsing. It supports the following case-insensitive modes. * <ul> * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts * the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep |