aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-09-06 22:20:55 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-06 22:20:55 +0800
commitbc2767df2666ff615e7f44e980555afab06dd8a3 (patch)
tree9351e11c0d0abad856f637bf05d2930c3d80fa13 /sql/core/src/test/scala
parent39d538dddf7d44bf4603c966d0f7b2c92f1e951a (diff)
downloadspark-bc2767df2666ff615e7f44e980555afab06dd8a3.tar.gz
spark-bc2767df2666ff615e7f44e980555afab06dd8a3.tar.bz2
spark-bc2767df2666ff615e7f44e980555afab06dd8a3.zip
[SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReader
## What changes were proposed in this pull request? This PR adds better error messages for malformed record when reading a JSON file using DataFrameReader. For example, for query: ``` import org.apache.spark.sql.types._ val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil) val schema = StructType(StructField("a", StringType, true) :: Nil) val jsonDF = spark.read.schema(schema).json(corruptRecords) ``` **Before change:** We silently replace corrupted line with null ``` scala> jsonDF.show +----+ | a| +----+ |null| +----+ ``` **After change:** Add an explicit warning message: ``` scala> jsonDF.show 16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records (sample: {"a":{, b:3}). The JSON reader will replace all malformed records with placeholder null in current PERMISSIVE parser mode. To find out which corrupted records have been replaced with null, please use the default inferred schema instead of providing a custom schema. Code example to print all malformed records (scala): =================================================== // The corrupted record exists in column _corrupt_record. val parsedJson = spark.read.json("/path/to/json/file/test.json") +----+ | a| +----+ |null| +----+ ``` ### ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #14929 from clockfly/logwarning_if_schema_not_contain_corrupted_record.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala29
1 files changed, 28 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 63a9061210..3d533c14e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}
- test("Corrupt records: PERMISSIVE mode") {
+ test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
+ withTempView("jsonTable") {
+ val schema = StructType(
+ StructField("a", StringType, true) ::
+ StructField("b", StringType, true) ::
+ StructField("c", StringType, true) :: Nil)
+
+ val jsonDF = spark.read.schema(schema).json(corruptRecords)
+ jsonDF.createOrReplaceTempView("jsonTable")
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT a, b, c
+ |FROM jsonTable
+ """.stripMargin),
+ Seq(
+ // Corrupted records are replaced with null
+ Row(null, null, null),
+ Row(null, null, null),
+ Row(null, null, null),
+ Row("str_a_4", "str_b_4", "str_c_4"),
+ Row(null, null, null))
+ )
+ }
+ }
+
+ test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempView("jsonTable") {