aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-08-12 16:45:15 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-12 16:45:15 -0700
commit7035d880a0cf06910c19b4afd49645124c620f14 (patch)
tree7d5920b619e13a17d3cb4eb82138431d410ddef1 /sql
parentab7e721cfec63155641e81e72b4ad43cf6a7d4c7 (diff)
downloadspark-7035d880a0cf06910c19b4afd49645124c620f14.tar.gz
spark-7035d880a0cf06910c19b4afd49645124c620f14.tar.bz2
spark-7035d880a0cf06910c19b4afd49645124c620f14.zip
[SPARK-9894] [SQL] Json writer should handle MapData.
https://issues.apache.org/jira/browse/SPARK-9894 Author: Yin Huai <yhuai@databricks.com> Closes #8137 from yhuai/jsonMapData.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala78
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala30
3 files changed, 83 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 37c2b5a296..99ac7730bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -107,12 +107,12 @@ private[sql] object JacksonGenerator {
v.foreach(ty, (_, value) => valWriter(ty, value))
gen.writeEndArray()
- case (MapType(kv, vv, _), v: Map[_, _]) =>
+ case (MapType(kt, vt, _), v: MapData) =>
gen.writeStartObject()
- v.foreach { p =>
- gen.writeFieldName(p._1.toString)
- valWriter(vv, p._2)
- }
+ v.foreach(kt, vt, { (k, v) =>
+ gen.writeFieldName(k.toString)
+ valWriter(vt, v)
+ })
gen.writeEndObject()
case (StructType(ty), v: InternalRow) =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
new file mode 100644
index 0000000000..ed6d512ab3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
+ override val dataSourceName: String = "json"
+
+ import sqlContext._
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""")
+ .saveAsTextFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ read.format(dataSourceName)
+ .option("dataSchema", dataSchemaWithPartition.json)
+ .load(file.getCanonicalPath))
+ }
+ }
+
+ test("SPARK-9894: save complex types to JSON") {
+ withTempDir { file =>
+ file.delete()
+
+ val schema =
+ new StructType()
+ .add("array", ArrayType(LongType))
+ .add("map", MapType(StringType, new StructType().add("innerField", LongType)))
+
+ val data =
+ Row(Seq(1L, 2L, 3L), Map("m1" -> Row(4L))) ::
+ Row(Seq(5L, 6L, 7L), Map("m2" -> Row(10L))) :: Nil
+ val df = createDataFrame(sparkContext.parallelize(data), schema)
+
+ // Write the data out.
+ df.write.format(dataSourceName).save(file.getCanonicalPath)
+
+ // Read it back and check the result.
+ checkAnswer(
+ read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
+ df
+ )
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 48c37a1fa1..e8975e5f5c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -50,33 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}
-
-class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
- override val dataSourceName: String =
- classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource].getCanonicalName
-
- import sqlContext._
-
- test("save()/load() - partitioned table - simple queries - partition columns in data") {
- withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
- for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
- sparkContext
- .parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""")
- .saveAsTextFile(partitionDir.toString)
- }
-
- val dataSchemaWithPartition =
- StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
-
- checkQueries(
- read.format(dataSourceName)
- .option("dataSchema", dataSchemaWithPartition.json)
- .load(file.getCanonicalPath))
- }
- }
-}