diff options
author | Jeff Zhang <zjffdu@apache.org> | 2015-10-31 11:10:37 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-10-31 11:10:37 +0000 |
commit | 97b3c8fb470f0d3c1cdb1aeb27f675e695442e87 (patch) | |
tree | 4d28501f9739651590cbd1b173c7ad92c87bfa54 | |
parent | 3c471885dc4f86bea95ab542e0d48d22ae748404 (diff) | |
download | spark-97b3c8fb470f0d3c1cdb1aeb27f675e695442e87.tar.gz spark-97b3c8fb470f0d3c1cdb1aeb27f675e695442e87.tar.bz2 spark-97b3c8fb470f0d3c1cdb1aeb27f675e695442e87.zip |
[SPARK-11226][SQL] Empty line in json file should be skipped
Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped.
Make a trivial change for this.
Author: Jeff Zhang <zjffdu@apache.org>
Closes #9211 from zjffdu/SPARK-11226.
3 files changed, 36 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index b2e52011a7..4f53eeb081 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -245,29 +245,33 @@ private[sql] object JacksonParser { val factory = new JsonFactory() iter.flatMap { record => - try { - Utils.tryWithResource(factory.createParser(record)) { parser => - parser.nextToken() - - convertField(factory, parser, schema) match { - case null => failedRecord(record) - case row: InternalRow => row :: Nil - case array: ArrayData => - if (array.numElements() == 0) { - Nil - } else { - array.toArray[InternalRow](schema) - } - case _ => - sys.error( - s"Failed to parse record $record. Please make sure that each line of the file " + - "(or each string in the RDD) is a valid JSON object or " + - "an array of JSON objects.") + if (record.trim.isEmpty) { + Nil + } else { + try { + Utils.tryWithResource(factory.createParser(record)) { parser => + parser.nextToken() + + convertField(factory, parser, schema) match { + case null => failedRecord(record) + case row: InternalRow => row :: Nil + case array: ArrayData => + if (array.numElements() == 0) { + Nil + } else { + array.toArray[InternalRow](schema) + } + case _ => + sys.error( + s"Failed to parse record $record. Please make sure that each line of " + + "the file (or each string in the RDD) is a valid JSON object or " + + "an array of JSON objects.") + } } + } catch { + case _: JsonProcessingException => + failedRecord(record) } - } catch { - case _: JsonProcessingException => - failedRecord(record) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5a616fac0b..5413ef1287 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -225,6 +225,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row("1"), Row("2"))) } + test("SPARK-11226 Skip empty line in json file") { + sqlContext.read.json( + sparkContext.parallelize( + Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", ""))) + .registerTempTable("d") + + checkAnswer( + sql("select count(1) from d"), + Seq(Row(3))) + } + test("SPARK-8828 sum should return null if all input values are null") { withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "true") { withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index d3fd409291..28b8f02bdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -959,7 +959,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { withTempTable("jsonTable") { val jsonDF = sqlContext.read.json(corruptRecords) jsonDF.registerTempTable("jsonTable") - val schema = StructType( StructField("_unparsed", StringType, true) :: StructField("a", StringType, true) :: @@ -976,7 +975,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { |FROM jsonTable """.stripMargin), Row(null, null, null, "{") :: - Row(null, null, null, "") :: Row(null, null, null, """{"a":1, b:2}""") :: Row(null, null, null, """{"a":{, b:3}""") :: Row("str_a_4", "str_b_4", "str_c_4", null) :: @@ -1001,7 +999,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { |WHERE _unparsed IS NOT NULL """.stripMargin), Row("{") :: - Row("") :: Row("""{"a":1, b:2}""") :: Row("""{"a":{, b:3}""") :: Row("]") :: Nil |