diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2017-04-03 17:44:39 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-04-03 17:44:39 +0800 |
commit | 4fa1a43af6b5a6abaef7e04cacb2617a2e92d816 (patch) | |
tree | c07f8d0c8cdfa086f1734004fb8f11f9526c41d7 /sql/core/src/test/scala/org | |
parent | 4d28e8430d11323f08657ca8f3251ca787c45501 (diff) | |
download | spark-4fa1a43af6b5a6abaef7e04cacb2617a2e92d816.tar.gz spark-4fa1a43af6b5a6abaef7e04cacb2617a2e92d816.tar.bz2 spark-4fa1a43af6b5a6abaef7e04cacb2617a2e92d816.zip |
[SPARK-19641][SQL] JSON schema inference in DROPMALFORMED mode produces incorrect schema for non-array/object JSONs
## What changes were proposed in this pull request?
Currently, when we infer the types for vaild JSON strings but object or array, we are producing empty schemas regardless of parse modes as below:
```scala
scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
root
```
```scala
scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
root
```
This PR proposes to handle parse modes in type inference.
After this PR,
```scala
scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
root
|-- a: long (nullable = true)
```
```
scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema()
java.lang.RuntimeException: Failed to infer a common schema. Struct types are expected but string was found.
```
This PR is based on https://github.com/NathanHowell/spark/commit/e233fd03346a73b3b447fa4c24f3b12c8b2e53ae and I and NathanHowell talked about this in https://issues.apache.org/jira/browse/SPARK-19641
## How was this patch tested?
Unit tests in `JsonSuite` for both `DROPMALFORMED` and `FAILFAST` modes.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #17492 from HyukjinKwon/SPARK-19641.
Diffstat (limited to 'sql/core/src/test/scala/org')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala | 34 |
1 files changed, 31 insertions, 3 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b09cef76d2..2ab0381996 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1041,7 +1041,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read .option("mode", "FAILFAST") .json(corruptRecords) - .collect() } assert(exceptionOne.getMessage.contains("JsonParseException")) @@ -1082,6 +1081,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(jsonDFTwo.schema === schemaTwo) } + test("SPARK-19641: Additional corrupt records: DROPMALFORMED mode") { + val schema = new StructType().add("dummy", StringType) + // `DROPMALFORMED` mode should skip corrupt records + val jsonDF = spark.read + .option("mode", "DROPMALFORMED") + .json(additionalCorruptRecords) + checkAnswer( + jsonDF, + Row("test")) + assert(jsonDF.schema === schema) + } + test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") { val schema = StructType( StructField("a", StringType, true) :: @@ -1882,6 +1893,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("SPARK-19641: Handle multi-line corrupt documents (DROPMALFORMED)") { + withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords + .toDF("value") + // this is the minimum partition count that avoids hash collisions + .repartition(corruptRecordCount * 4, F.hash($"value")) + .write + .text(path) + + val jsonDF = spark.read.option("wholeFile", true).option("mode", "DROPMALFORMED").json(path) + checkAnswer(jsonDF, Seq(Row("test"))) + } + } + test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") { withTempPath { dir => val path = dir.getCanonicalPath @@ -1903,9 +1932,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("wholeFile", true) .option("mode", "FAILFAST") .json(path) - .collect() } - assert(exceptionOne.getMessage.contains("Failed to parse a value")) + assert(exceptionOne.getMessage.contains("Failed to infer a common schema")) val exceptionTwo = intercept[SparkException] { spark.read |