aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2015-10-31 11:10:37 +0000
committerSean Owen <sowen@cloudera.com>2015-10-31 11:10:37 +0000
commit97b3c8fb470f0d3c1cdb1aeb27f675e695442e87 (patch)
tree4d28501f9739651590cbd1b173c7ad92c87bfa54
parent3c471885dc4f86bea95ab542e0d48d22ae748404 (diff)
downloadspark-97b3c8fb470f0d3c1cdb1aeb27f675e695442e87.tar.gz
spark-97b3c8fb470f0d3c1cdb1aeb27f675e695442e87.tar.bz2
spark-97b3c8fb470f0d3c1cdb1aeb27f675e695442e87.zip
[SPARK-11226][SQL] Empty line in json file should be skipped
Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped. Make a trivial change for this. Author: Jeff Zhang <zjffdu@apache.org> Closes #9211 from zjffdu/SPARK-11226.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala3
3 files changed, 36 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index b2e52011a7..4f53eeb081 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -245,29 +245,33 @@ private[sql] object JacksonParser {
val factory = new JsonFactory()
iter.flatMap { record =>
- try {
- Utils.tryWithResource(factory.createParser(record)) { parser =>
- parser.nextToken()
-
- convertField(factory, parser, schema) match {
- case null => failedRecord(record)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- sys.error(
- s"Failed to parse record $record. Please make sure that each line of the file " +
- "(or each string in the RDD) is a valid JSON object or " +
- "an array of JSON objects.")
+ if (record.trim.isEmpty) {
+ Nil
+ } else {
+ try {
+ Utils.tryWithResource(factory.createParser(record)) { parser =>
+ parser.nextToken()
+
+ convertField(factory, parser, schema) match {
+ case null => failedRecord(record)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ sys.error(
+ s"Failed to parse record $record. Please make sure that each line of " +
+ "the file (or each string in the RDD) is a valid JSON object or " +
+ "an array of JSON objects.")
+ }
}
+ } catch {
+ case _: JsonProcessingException =>
+ failedRecord(record)
}
- } catch {
- case _: JsonProcessingException =>
- failedRecord(record)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5a616fac0b..5413ef1287 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -225,6 +225,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row("1"), Row("2")))
}
+ test("SPARK-11226 Skip empty line in json file") {
+ sqlContext.read.json(
+ sparkContext.parallelize(
+ Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "")))
+ .registerTempTable("d")
+
+ checkAnswer(
+ sql("select count(1) from d"),
+ Seq(Row(3)))
+ }
+
test("SPARK-8828 sum should return null if all input values are null") {
withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "true") {
withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index d3fd409291..28b8f02bdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -959,7 +959,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
withTempTable("jsonTable") {
val jsonDF = sqlContext.read.json(corruptRecords)
jsonDF.registerTempTable("jsonTable")
-
val schema = StructType(
StructField("_unparsed", StringType, true) ::
StructField("a", StringType, true) ::
@@ -976,7 +975,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|FROM jsonTable
""".stripMargin),
Row(null, null, null, "{") ::
- Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1001,7 +999,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|WHERE _unparsed IS NOT NULL
""".stripMargin),
Row("{") ::
- Row("") ::
Row("""{"a":1, b:2}""") ::
Row("""{"a":{, b:3}""") ::
Row("]") :: Nil