aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala39
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala29
2 files changed, 66 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 359a3e2aa8..5ce1bf7432 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +53,11 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)
+ private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
+
+ @transient
+ private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+
/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
@@ -62,8 +68,39 @@ class JacksonParser(
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
- logWarning(s"Dropping malformed line: $record")
+ if (!isWarningPrintedForMalformedRecord) {
+ logWarning(
+ s"""Found at least one malformed records (sample: $record). The JSON reader will drop
+ |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
+ |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
+ |mode and use the default inferred schema.
+ |
+ |Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${columnNameOfCorruptRecord}
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin)
+ isWarningPrintedForMalformedRecord = true
+ }
Nil
+ } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
+ if (!isWarningPrintedForMalformedRecord) {
+ logWarning(
+ s"""Found at least one malformed records (sample: $record). The JSON reader will replace
+ |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
+ |To find out which corrupted records have been replaced with null, please use the
+ |default inferred schema instead of providing a custom schema.
+ |
+ |Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin)
+ isWarningPrintedForMalformedRecord = true
+ }
+ emptyRow
} else {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 63a9061210..3d533c14e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(jsonDFTwo.schema === schemaTwo)
}
- test("Corrupt records: PERMISSIVE mode") {
+ test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
+ withTempView("jsonTable") {
+ val schema = StructType(
+ StructField("a", StringType, true) ::
+ StructField("b", StringType, true) ::
+ StructField("c", StringType, true) :: Nil)
+
+ val jsonDF = spark.read.schema(schema).json(corruptRecords)
+ jsonDF.createOrReplaceTempView("jsonTable")
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT a, b, c
+ |FROM jsonTable
+ """.stripMargin),
+ Seq(
+ // Corrupted records are replaced with null
+ Row(null, null, null),
+ Row(null, null, null),
+ Row(null, null, null),
+ Row("str_a_4", "str_b_4", "str_c_4"),
+ Row(null, null, null))
+ )
+ }
+ }
+
+ test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempView("jsonTable") {