aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-03-22 09:52:37 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-22 09:52:37 -0700
commit465818389aab1217c9de5c685cfaee3ffaec91bb (patch)
tree54691a40b9b00854f5c6fc343c0186c7bc214f22 /sql/catalyst
parent0caade634076034182e22318eb09a6df1c560576 (diff)
downloadspark-465818389aab1217c9de5c685cfaee3ffaec91bb.tar.gz
spark-465818389aab1217c9de5c685cfaee3ffaec91bb.tar.bz2
spark-465818389aab1217c9de5c685cfaee3ffaec91bb.zip
[SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related comments
## What changes were proposed in this pull request? This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix. Also, this PR modifies some tests related parse modes. ## How was this patch tested? Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17377 from HyukjinKwon/SPARK-19949.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala56
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala41
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala4
6 files changed, 71 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 08af5522d8..df4d406b84 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, GenericArrayData, ParseModes}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -548,7 +548,7 @@ case class JsonToStructs(
lazy val parser =
new JacksonParser(
rowSchema,
- new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
+ new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
override def dataType: DataType = schema
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 355c26afa6..c22b1ade4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -65,7 +65,8 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
- val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+ val parseMode: ParseMode =
+ parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
@@ -82,15 +83,6 @@ private[sql] class JSONOptions(
val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
- // Parse mode flags
- if (!ParseModes.isValidMode(parseMode)) {
- logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
- }
-
- val failFast = ParseModes.isFailFastMode(parseMode)
- val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
- val permissive = ParseModes.isPermissiveMode(parseMode)
-
/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index e8da10d65e..725e3015b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.UTF8String
class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
- mode: String,
+ mode: ParseMode,
schema: StructType,
columnNameOfCorruptRecord: String) {
@@ -58,11 +58,14 @@ class FailureSafeParser[IN](
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
} catch {
- case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
- Iterator(toResultRow(e.partialResult(), e.record))
- case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
- Iterator.empty
- case e: BadRecordException => throw e.cause
+ case e: BadRecordException => mode match {
+ case PermissiveMode =>
+ Iterator(toResultRow(e.partialResult(), e.record))
+ case DropMalformedMode =>
+ Iterator.empty
+ case FailFastMode =>
+ throw e.cause
+ }
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
new file mode 100644
index 0000000000..4565dbde88
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.internal.Logging
+
+sealed trait ParseMode {
+ /**
+ * String name of the parse mode.
+ */
+ def name: String
+}
+
+/**
+ * This mode permissively parses the records.
+ */
+case object PermissiveMode extends ParseMode { val name = "PERMISSIVE" }
+
+/**
+ * This mode ignores the whole corrupted records.
+ */
+case object DropMalformedMode extends ParseMode { val name = "DROPMALFORMED" }
+
+/**
+ * This mode throws an exception when it meets corrupted records.
+ */
+case object FailFastMode extends ParseMode { val name = "FAILFAST" }
+
+object ParseMode extends Logging {
+ /**
+ * Returns the parse mode from the given string.
+ */
+ def fromString(mode: String): ParseMode = mode.toUpperCase match {
+ case PermissiveMode.name => PermissiveMode
+ case DropMalformedMode.name => DropMalformedMode
+ case FailFastMode.name => FailFastMode
+ case _ =>
+ logWarning(s"$mode is not a valid parse mode. Using ${PermissiveMode.name}.")
+ PermissiveMode
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
deleted file mode 100644
index 0e466962b4..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.util
-
-object ParseModes {
- val PERMISSIVE_MODE = "PERMISSIVE"
- val DROP_MALFORMED_MODE = "DROPMALFORMED"
- val FAIL_FAST_MODE = "FAILFAST"
-
- val DEFAULT = PERMISSIVE_MODE
-
- def isValidMode(mode: String): Boolean = {
- mode.toUpperCase match {
- case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
- case _ => false
- }
- }
-
- def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
- def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
- def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
- mode.toUpperCase == PERMISSIVE_MODE
- } else {
- true // We default to permissive is the mode string is not valid
- }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index e4698d4463..c5b72235e5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -21,7 +21,7 @@ import java.util.Calendar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -367,7 +367,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// Other modes should still return `null`.
checkEvaluation(
- JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
+ JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
null
)
}