aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-23 17:16:34 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-23 17:16:34 -0800
commit48376bfe9c97bf31279918def6c6615849c88f4d (patch)
tree680eb20fe677e07b1508604aa849524cb885023e /sql/core
parent59536cc87e10e5011560556729dd901280958f43 (diff)
downloadspark-48376bfe9c97bf31279918def6c6615849c88f4d.tar.gz
spark-48376bfe9c97bf31279918def6c6615849c88f4d.tar.bz2
spark-48376bfe9c97bf31279918def6c6615849c88f4d.zip
[SPARK-5935][SQL] Accept MapType in the schema provided to a JSON dataset.
JIRA: https://issues.apache.org/jira/browse/SPARK-5935 Author: Yin Huai <yhuai@databricks.com> Author: Yin Huai <huai@cse.ohio-state.edu> Closes #4710 from yhuai/jsonMapType and squashes the following commits: 3e40390 [Yin Huai] Remove unnecessary changes. f8e6267 [Yin Huai] Fix test. baa36e3 [Yin Huai] Accept MapType in the schema provided to jsonFile/jsonRDD.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala56
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala17
3 files changed, 76 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 3b8dde1823..d83bdc2f7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -416,6 +416,9 @@ private[sql] object JsonRDD extends Logging {
case NullType => null
case ArrayType(elementType, _) =>
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+ case MapType(StringType, valueType, _) =>
+ val map = value.asInstanceOf[Map[String, Any]]
+ map.mapValues(enforceCorrectType(_, valueType)).map(identity)
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
case DateType => toDate(value)
case TimestampType => toTimestamp(value)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index c94e44bd7c..005f20b96d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -657,6 +657,62 @@ class JsonSuite extends QueryTest {
)
}
+ test("Applying schemas with MapType") {
+ val schemaWithSimpleMap = StructType(
+ StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
+ val jsonWithSimpleMap = jsonRDD(mapType1, schemaWithSimpleMap)
+
+ jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap")
+
+ checkAnswer(
+ sql("select map from jsonWithSimpleMap"),
+ Row(Map("a" -> 1)) ::
+ Row(Map("b" -> 2)) ::
+ Row(Map("c" -> 3)) ::
+ Row(Map("c" -> 1, "d" -> 4)) ::
+ Row(Map("e" -> null)) :: Nil
+ )
+
+ checkAnswer(
+ sql("select map['c'] from jsonWithSimpleMap"),
+ Row(null) ::
+ Row(null) ::
+ Row(3) ::
+ Row(1) ::
+ Row(null) :: Nil
+ )
+
+ val innerStruct = StructType(
+ StructField("field1", ArrayType(IntegerType, true), true) ::
+ StructField("field2", IntegerType, true) :: Nil)
+ val schemaWithComplexMap = StructType(
+ StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
+
+ val jsonWithComplexMap = jsonRDD(mapType2, schemaWithComplexMap)
+
+ jsonWithComplexMap.registerTempTable("jsonWithComplexMap")
+
+ checkAnswer(
+ sql("select map from jsonWithComplexMap"),
+ Row(Map("a" -> Row(Seq(1, 2, 3, null), null))) ::
+ Row(Map("b" -> Row(null, 2))) ::
+ Row(Map("c" -> Row(Seq(), 4))) ::
+ Row(Map("c" -> Row(null, 3), "d" -> Row(Seq(null), null))) ::
+ Row(Map("e" -> null)) ::
+ Row(Map("f" -> Row(null, null))) :: Nil
+ )
+
+ checkAnswer(
+ sql("select map['a'].field1, map['c'].field2 from jsonWithComplexMap"),
+ Row(Seq(1, 2, 3, null), null) ::
+ Row(null, null) ::
+ Row(null, 4) ::
+ Row(null, 3) ::
+ Row(null, null) ::
+ Row(null, null) :: Nil
+ )
+ }
+
test("SPARK-2096 Correctly parse dot notations") {
val jsonDF = jsonRDD(complexFieldAndType2)
jsonDF.registerTempTable("jsonTable")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index 3370b3c98b..15698f61e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -146,6 +146,23 @@ object TestJsonData {
]]
}""" :: Nil)
+ val mapType1 =
+ TestSQLContext.sparkContext.parallelize(
+ """{"map": {"a": 1}}""" ::
+ """{"map": {"b": 2}}""" ::
+ """{"map": {"c": 3}}""" ::
+ """{"map": {"c": 1, "d": 4}}""" ::
+ """{"map": {"e": null}}""" :: Nil)
+
+ val mapType2 =
+ TestSQLContext.sparkContext.parallelize(
+ """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
+ """{"map": {"b": {"field2": 2}}}""" ::
+ """{"map": {"c": {"field1": [], "field2": 4}}}""" ::
+ """{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" ::
+ """{"map": {"e": null}}""" ::
+ """{"map": {"f": {"field1": null}}}""" :: Nil)
+
val nullsInArrays =
TestSQLContext.sparkContext.parallelize(
"""{"field1":[[null], [[["Test"]]]]}""" ::