diff options
author | Yin Huai <huai@cse.ohio-state.edu> | 2014-09-16 11:40:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-09-16 11:40:28 -0700 |
commit | 7583699873fb4f252c6ce65db1096783ef438731 (patch) | |
tree | f3c5d64dfd766f4fd19f14ab680664efd0df0e23 | |
parent | 9d5fa763d8559ac412a18d7a2f43c4368a0af897 (diff) | |
download | spark-7583699873fb4f252c6ce65db1096783ef438731.tar.gz spark-7583699873fb4f252c6ce65db1096783ef438731.tar.bz2 spark-7583699873fb4f252c6ce65db1096783ef438731.zip |
[SPARK-3308][SQL] Ability to read JSON Arrays as tables
This PR aims to support reading top level JSON arrays and take every element in such an array as a row (an empty array will not generate a row).
JIRA: https://issues.apache.org/jira/browse/SPARK-3308
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes #2400 from yhuai/SPARK-3308 and squashes the following commits:
990077a [Yin Huai] Handle top level JSON arrays.
3 files changed, 31 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 873221835d..0f27fd13e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -287,9 +287,13 @@ private[sql] object JsonRDD extends Logging { // the ObjectMapper will take the last value associated with this duplicate key. // For example: for {"key": 1, "key":2}, we will get "key"->2. val mapper = new ObjectMapper() - iter.map { record => - val parsed = scalafy(mapper.readValue(record, classOf[java.util.Map[String, Any]])) - parsed.asInstanceOf[Map[String, Any]] + iter.flatMap { record => + val parsed = mapper.readValue(record, classOf[Object]) match { + case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil + case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]] + } + + parsed } }) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index b50d938554..685e788207 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -622,4 +622,21 @@ class JsonSuite extends QueryTest { ("str1", Nil, "str4", 2) :: Nil ) } + + test("SPARK-3308 Read top level JSON arrays") { + val jsonSchemaRDD = jsonRDD(jsonArray) + jsonSchemaRDD.registerTempTable("jsonTable") + + checkAnswer( + sql( + """ + |select a, b, c + |from jsonTable + """.stripMargin), + ("str_a_1", null, null) :: + ("str_a_2", null, null) :: + (null, "str_b_3", null) :: + ("str_a_4", "str_b_4", "str_c_4") ::Nil + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala index 5f0b3959a6..fc833b8b54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala @@ -136,4 +136,11 @@ object TestJsonData { ] ]] }""" :: Nil) + + val jsonArray = + TestSQLContext.sparkContext.parallelize( + """[{"a":"str_a_1"}]""" :: + """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" :: + """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: + """[]""" :: Nil) } |