aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-09-06 22:20:55 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-06 22:20:55 +0800
commitbc2767df2666ff615e7f44e980555afab06dd8a3 (patch)
tree9351e11c0d0abad856f637bf05d2930c3d80fa13 /sql
parent39d538dddf7d44bf4603c966d0f7b2c92f1e951a (diff)
downloadspark-bc2767df2666ff615e7f44e980555afab06dd8a3.tar.gz
spark-bc2767df2666ff615e7f44e980555afab06dd8a3.tar.bz2
spark-bc2767df2666ff615e7f44e980555afab06dd8a3.zip
[SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReader
## What changes were proposed in this pull request? This PR adds better error messages for malformed record when reading a JSON file using DataFrameReader. For example, for query: ``` import org.apache.spark.sql.types._ val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil) val schema = StructType(StructField("a", StringType, true) :: Nil) val jsonDF = spark.read.schema(schema).json(corruptRecords) ``` **Before change:** We silently replace corrupted line with null ``` scala> jsonDF.show +----+ | a| +----+ |null| +----+ ``` **After change:** Add an explicit warning message: ``` scala> jsonDF.show 16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records (sample: {"a":{, b:3}). The JSON reader will replace all malformed records with placeholder null in current PERMISSIVE parser mode. To find out which corrupted records have been replaced with null, please use the default inferred schema instead of providing a custom schema. Code example to print all malformed records (scala): =================================================== // The corrupted record exists in column _corrupt_record. val parsedJson = spark.read.json("/path/to/json/file/test.json") +----+ | a| +----+ |null| +----+ ``` ### ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #14929 from clockfly/logwarning_if_schema_not_contain_corrupted_record.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala29
2 files changed, 66 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 359a3e2aa8..5ce1bf7432 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +53,11 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)
+ private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
+
+ @transient
+ private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+
/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
@@ -62,8 +68,39 @@ class JacksonParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
- logWarning(s"Dropping malformed line: $record")
+ if (!isWarningPrintedForMalformedRecord) {
+ logWarning(
+ s"""Found at least one malformed records (sample: $record). The JSON reader will drop
+ |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
+ |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
+ |mode and use the default inferred schema.
+ |
+ |Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${columnNameOfCorruptRecord}
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin)
+ isWarningPrintedForMalformedRecord = true
+ }
Nil
+ } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
+ if (!isWarningPrintedForMalformedRecord) {
+ logWarning(
+ s"""Found at least one malformed records (sample: $record). The JSON reader will replace
+ |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
+ |To find out which corrupted records have been replaced with null, please use the
+ |default inferred schema instead of providing a custom schema.
+ |
+ |Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin)
+ isWarningPrintedForMalformedRecord = true
+ }
+ emptyRow
} else {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 63a9061210..3d533c14e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}
- test("Corrupt records: PERMISSIVE mode") {
+ test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
+ withTempView("jsonTable") {
+ val schema = StructType(
+ StructField("a", StringType, true) ::
+ StructField("b", StringType, true) ::
+ StructField("c", StringType, true) :: Nil)
+
+ val jsonDF = spark.read.schema(schema).json(corruptRecords)
+ jsonDF.createOrReplaceTempView("jsonTable")
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT a, b, c
+ |FROM jsonTable
+ """.stripMargin),
+ Seq(
+ // Corrupted records are replaced with null
+ Row(null, null, null),
+ Row(null, null, null),
+ Row(null, null, null),
+ Row("str_a_4", "str_b_4", "str_c_4"),
+ Row(null, null, null))
+ )
+ }
+ }
+
+ test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempView("jsonTable") {