aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-03-21 15:42:35 +0800
committerWenchen Fan <wenchen@databricks.com>2016-03-21 15:42:35 +0800
commite474088144cdd2632cf2fef6b2cf10b3cd191c23 (patch)
tree66374e51a3318ea931ebef343e0a3d285c06d2c5 /sql/core
parentf58319a24fd5e026411538b1fb7336d9d894277b (diff)
downloadspark-e474088144cdd2632cf2fef6b2cf10b3cd191c23.tar.gz
spark-e474088144cdd2632cf2fef6b2cf10b3cd191c23.tar.bz2
spark-e474088144cdd2632cf2fef6b2cf10b3cd191c23.zip
[SPARK-13764][SQL] Parse modes in JSON data source
## What changes were proposed in this pull request? Currently, there is no way to control the behaviour when fails to parse corrupt records in JSON data source . This PR adds the support for parse modes just like CSV data source. There are three modes below: - `PERMISSIVE` : When it fails to parse, this sets `null` to to field. This is a default mode when it has been this mode. - `DROPMALFORMED`: When it fails to parse, this drops the whole record. - `FAILFAST`: When it fails to parse, it just throws an exception. This PR also make JSON data source share the `ParseModes` in CSV data source. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for code style tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #11756 from HyukjinKwon/SPARK-13764.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala52
7 files changed, 156 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1b5a4999a8..0dc0d44d6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -289,6 +289,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* </li>
* <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
* (e.g. 00012)</li>
+ * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
+ * during parsing.<li>
+ * <ul>
+ * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
+ * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+ * a schema is set by user, it sets `null` for extra fields.</li>
+ * <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+ * <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
+ * </ul>
*
* @since 1.4.0
*/
@@ -313,6 +322,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* (e.g. 00012)</li>
* <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
* character using backslash quoting mechanism</li>
+ * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
+ * during parsing.<li>
+ * <ul>
+ * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
+ * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+ * a schema is set by user, it sets `null` for extra fields.</li>
+ * <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+ * <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
+ * </ul>
*
* @since 1.6.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
new file mode 100644
index 0000000000..468228053c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+private[datasources] object ParseModes {
+ val PERMISSIVE_MODE = "PERMISSIVE"
+ val DROP_MALFORMED_MODE = "DROPMALFORMED"
+ val FAIL_FAST_MODE = "FAILFAST"
+
+ val DEFAULT = PERMISSIVE_MODE
+
+ def isValidMode(mode: String): Boolean = {
+ mode.toUpperCase match {
+ case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
+ case _ => false
+ }
+ }
+
+ def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
+ def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
+ def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
+ mode.toUpperCase == PERMISSIVE_MODE
+ } else {
+ true // We default to permissive is the mode string is not valid
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e009a37f2d..95de02cf5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
private[sql] class CSVOptions(
@transient private val parameters: Map[String, String])
@@ -62,7 +62,7 @@ private[sql] class CSVOptions(
val delimiter = CSVTypeCast.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
- val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+ private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
@@ -101,26 +101,3 @@ private[sql] class CSVOptions(
val rowSeparator = "\n"
}
-
-private[csv] object ParseModes {
- val PERMISSIVE_MODE = "PERMISSIVE"
- val DROP_MALFORMED_MODE = "DROPMALFORMED"
- val FAIL_FAST_MODE = "FAILFAST"
-
- val DEFAULT = PERMISSIVE_MODE
-
- def isValidMode(mode: String): Boolean = {
- mode.toUpperCase match {
- case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
- case _ => false
- }
- }
-
- def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
- def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
- def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
- mode.toUpperCase == PERMISSIVE_MODE
- } else {
- true // We default to permissive is the mode string is not valid
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 0937a213c9..945ed2c211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -40,6 +40,7 @@ private[sql] object InferSchema {
configOptions: JSONOptions): StructType = {
require(configOptions.samplingRatio > 0,
s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
+ val shouldHandleCorruptRecord = configOptions.permissive
val schemaData = if (configOptions.samplingRatio > 0.99) {
json
} else {
@@ -50,21 +51,23 @@ private[sql] object InferSchema {
val rootType = schemaData.mapPartitions { iter =>
val factory = new JsonFactory()
configOptions.setJacksonOptions(factory)
- iter.map { row =>
+ iter.flatMap { row =>
try {
Utils.tryWithResource(factory.createParser(row)) { parser =>
parser.nextToken()
- inferField(parser, configOptions)
+ Some(inferField(parser, configOptions))
}
} catch {
+ case _: JsonParseException if shouldHandleCorruptRecord =>
+ Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))))
case _: JsonParseException =>
- StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
+ None
}
}
}.treeAggregate[DataType](
StructType(Seq()))(
- compatibleRootType(columnNameOfCorruptRecords),
- compatibleRootType(columnNameOfCorruptRecords))
+ compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord),
+ compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord))
canonicalizeType(rootType) match {
case Some(st: StructType) => st
@@ -194,18 +197,21 @@ private[sql] object InferSchema {
* Remove top-level ArrayType wrappers and merge the remaining schemas
*/
private def compatibleRootType(
- columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
+ columnNameOfCorruptRecords: String,
+ shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = {
// Since we support array of json objects at the top level,
// we need to check the element type and find the root level data type.
- case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
- case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+ case (ArrayType(ty1, _), ty2) =>
+ compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
+ case (ty1, ArrayType(ty2, _)) =>
+ compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
// If we see any other data type at the root level, we get records that cannot be
// parsed. So, we use the struct as the data type and add the corrupt field to the schema.
case (struct: StructType, NullType) => struct
case (NullType, struct: StructType) => struct
- case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
+ case (struct: StructType, o) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
withCorruptField(struct, columnNameOfCorruptRecords)
- case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
+ case (o, struct: StructType) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
withCorruptField(struct, columnNameOfCorruptRecords)
// If we get anything else, we call compatibleType.
// Usually, when we reach here, ty1 and ty2 are two StructTypes.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index e59dbd6b3d..93c3d47c1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
/**
* Options for the JSON data source.
@@ -28,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.CompressionCodecs
*/
private[sql] class JSONOptions(
@transient private val parameters: Map[String, String])
- extends Serializable {
+ extends Logging with Serializable {
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -49,6 +50,16 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
+ private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+
+ // Parse mode flags
+ if (!ParseModes.isValidMode(parseMode)) {
+ logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
+ }
+
+ val failFast = ParseModes.isFailFastMode(parseMode)
+ val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+ val permissive = ParseModes.isPermissiveMode(parseMode)
/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 3252b6c77f..00c14adf07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import com.fasterxml.jackson.core._
+import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -34,7 +35,7 @@ import org.apache.spark.util.Utils
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
-object JacksonParser {
+object JacksonParser extends Logging {
def parse(
input: RDD[String],
@@ -257,13 +258,20 @@ object JacksonParser {
def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
- val row = new GenericMutableRow(schema.length)
- for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
- require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String.fromString(record))
+ if (configOptions.failFast) {
+ throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+ }
+ if (configOptions.dropMalformed) {
+ logWarning(s"Dropping malformed line: $record")
+ Nil
+ } else {
+ val row = new GenericMutableRow(schema.length)
+ for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
+ require(schema(corruptIndex).dataType == StringType)
+ row.update(corruptIndex, UTF8String.fromString(record))
+ }
+ Seq(row)
}
-
- Seq(row)
}
val factory = new JsonFactory()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6d942c4c90..0a5699b99c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -963,7 +964,56 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
- test("Corrupt records") {
+ test("Corrupt records: FAILFAST mode") {
+ val schema = StructType(
+ StructField("a", StringType, true) :: Nil)
+ // `FAILFAST` mode should throw an exception for corrupt records.
+ val exceptionOne = intercept[SparkException] {
+ sqlContext.read
+ .option("mode", "FAILFAST")
+ .json(corruptRecords)
+ .collect()
+ }
+ assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {"))
+
+ val exceptionTwo = intercept[SparkException] {
+ sqlContext.read
+ .option("mode", "FAILFAST")
+ .schema(schema)
+ .json(corruptRecords)
+ .collect()
+ }
+ assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {"))
+ }
+
+ test("Corrupt records: DROPMALFORMED mode") {
+ val schemaOne = StructType(
+ StructField("a", StringType, true) ::
+ StructField("b", StringType, true) ::
+ StructField("c", StringType, true) :: Nil)
+ val schemaTwo = StructType(
+ StructField("a", StringType, true) :: Nil)
+ // `DROPMALFORMED` mode should skip corrupt records
+ val jsonDFOne = sqlContext.read
+ .option("mode", "DROPMALFORMED")
+ .json(corruptRecords)
+ checkAnswer(
+ jsonDFOne,
+ Row("str_a_4", "str_b_4", "str_c_4") :: Nil
+ )
+ assert(jsonDFOne.schema === schemaOne)
+
+ val jsonDFTwo = sqlContext.read
+ .option("mode", "DROPMALFORMED")
+ .schema(schemaTwo)
+ .json(corruptRecords)
+ checkAnswer(
+ jsonDFTwo,
+ Row("str_a_4") :: Nil)
+ assert(jsonDFTwo.schema === schemaTwo)
+ }
+
+ test("Corrupt records: PERMISSIVE mode") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempTable("jsonTable") {