aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-04-03 17:44:39 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-03 17:44:39 +0800
commit4fa1a43af6b5a6abaef7e04cacb2617a2e92d816 (patch)
treec07f8d0c8cdfa086f1734004fb8f11f9526c41d7 /sql/core/src/test/scala/org
parent4d28e8430d11323f08657ca8f3251ca787c45501 (diff)
downloadspark-4fa1a43af6b5a6abaef7e04cacb2617a2e92d816.tar.gz
spark-4fa1a43af6b5a6abaef7e04cacb2617a2e92d816.tar.bz2
spark-4fa1a43af6b5a6abaef7e04cacb2617a2e92d816.zip
[SPARK-19641][SQL] JSON schema inference in DROPMALFORMED mode produces incorrect schema for non-array/object JSONs
## What changes were proposed in this pull request? Currently, when we infer the types for vaild JSON strings but object or array, we are producing empty schemas regardless of parse modes as below: ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` ```scala scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` This PR proposes to handle parse modes in type inference. After this PR, ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root |-- a: long (nullable = true) ``` ``` scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() java.lang.RuntimeException: Failed to infer a common schema. Struct types are expected but string was found. ``` This PR is based on https://github.com/NathanHowell/spark/commit/e233fd03346a73b3b447fa4c24f3b12c8b2e53ae and I and NathanHowell talked about this in https://issues.apache.org/jira/browse/SPARK-19641 ## How was this patch tested? Unit tests in `JsonSuite` for both `DROPMALFORMED` and `FAILFAST` modes. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17492 from HyukjinKwon/SPARK-19641.
Diffstat (limited to 'sql/core/src/test/scala/org')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala34
1 files changed, 31 insertions, 3 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index b09cef76d2..2ab0381996 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1041,7 +1041,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
spark.read
.option("mode", "FAILFAST")
.json(corruptRecords)
- .collect()
}
assert(exceptionOne.getMessage.contains("JsonParseException"))
@@ -1082,6 +1081,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}
+ test("SPARK-19641: Additional corrupt records: DROPMALFORMED mode") {
+ val schema = new StructType().add("dummy", StringType)
+ // `DROPMALFORMED` mode should skip corrupt records
+ val jsonDF = spark.read
+ .option("mode", "DROPMALFORMED")
+ .json(additionalCorruptRecords)
+ checkAnswer(
+ jsonDF,
+ Row("test"))
+ assert(jsonDF.schema === schema)
+ }
+
test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
val schema = StructType(
StructField("a", StringType, true) ::
@@ -1882,6 +1893,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("SPARK-19641: Handle multi-line corrupt documents (DROPMALFORMED)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val corruptRecordCount = additionalCorruptRecords.count().toInt
+ assert(corruptRecordCount === 5)
+
+ additionalCorruptRecords
+ .toDF("value")
+ // this is the minimum partition count that avoids hash collisions
+ .repartition(corruptRecordCount * 4, F.hash($"value"))
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).option("mode", "DROPMALFORMED").json(path)
+ checkAnswer(jsonDF, Seq(Row("test")))
+ }
+ }
+
test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") {
withTempPath { dir =>
val path = dir.getCanonicalPath
@@ -1903,9 +1932,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.option("wholeFile", true)
.option("mode", "FAILFAST")
.json(path)
- .collect()
}
- assert(exceptionOne.getMessage.contains("Failed to parse a value"))
+ assert(exceptionOne.getMessage.contains("Failed to infer a common schema"))
val exceptionTwo = intercept[SparkException] {
spark.read