aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-22 09:52:37 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-22 09:52:37 -0700
commit465818389aab1217c9de5c685cfaee3ffaec91bb (patch)
tree54691a40b9b00854f5c6fc343c0186c7bc214f22 /sql/core/src/main/scala/org/apache
parent0caade634076034182e22318eb09a6df1c560576 (diff)
downloadspark-465818389aab1217c9de5c685cfaee3ffaec91bb.tar.gz
spark-465818389aab1217c9de5c685cfaee3ffaec91bb.tar.bz2
spark-465818389aab1217c9de5c685cfaee3ffaec91bb.zip
[SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related comments
## What changes were proposed in this pull request? This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix. Also, this PR modifies some tests related parse modes. ## How was this patch tested? Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17377 from HyukjinKwon/SPARK-19949.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala2
4 files changed, 6 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 767a636d70..e39b4d91f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -510,10 +510,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
- * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows
- * Spark will log for each partition. Malformed records beyond this number will be ignored.</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
- * during parsing.
+ * during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index f6c6b6f56c..5d2c23ed96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -82,7 +82,8 @@ class CSVOptions(
val delimiter = CSVUtils.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
- val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+ val parseMode: ParseMode =
+ parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
@@ -95,15 +96,6 @@ class CSVOptions(
val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace")
val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace")
- // Parse mode flags
- if (!ParseModes.isValidMode(parseMode)) {
- logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
- }
-
- val failFast = ParseModes.isFailFastMode(parseMode)
- val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
- val permissive = ParseModes.isPermissiveMode(parseMode)
-
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
@@ -139,8 +131,6 @@ class CSVOptions(
val escapeQuotes = getBool("escapeQuotes", true)
- val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)
-
val quoteAll = getBool("quoteAll", false)
val inputBufferSize = 128
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
index 7475f8ec79..e15c30b437 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
@@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.catalyst.util.PermissiveMode
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -40,7 +41,7 @@ private[sql] object JsonInferSchema {
json: RDD[T],
configOptions: JSONOptions,
createParser: (JsonFactory, T) => JsonParser): StructType = {
- val shouldHandleCorruptRecord = configOptions.permissive
+ val shouldHandleCorruptRecord = configOptions.parseMode == PermissiveMode
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord
// perform schema inference on each row and merge afterwards
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 388ef182ce..f6e2fef74b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -260,7 +260,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
- * during parsing.
+ * during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep