aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-11-02 15:46:56 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 15:46:56 -0800
commit06232d23ff2a6344c49fff81364d9f6b02af326b (patch)
tree4f3bbd8094acd58684b06be62e2c28c1405f7eae /sql
parente749f5dedbad412430b86e7290085095f8dec0d1 (diff)
downloadspark-06232d23ff2a6344c49fff81364d9f6b02af326b.tar.gz
spark-06232d23ff2a6344c49fff81364d9f6b02af326b.tar.bz2
spark-06232d23ff2a6344c49fff81364d9f6b02af326b.zip
[SPARK-4185][SQL] JSON schema inference failed when dealing with type conflicts in arrays
JIRA: https://issues.apache.org/jira/browse/SPARK-4185. This PR also has the fix of #3052. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #3056 from yhuai/SPARK-4185 and squashes the following commits: ed3a5a8 [Yin Huai] Correctly handle type conflicts between structs and primitive types in an array.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala4
3 files changed, 20 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 5bb6f6c85d..0f2dcdcacf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -73,16 +73,18 @@ private[sql] object JsonRDD extends Logging {
def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = {
val (topLevel, structLike) = values.partition(_.size == 1)
+
val topLevelFields = topLevel.filter {
name => resolved.get(prefix ++ name).get match {
case ArrayType(elementType, _) => {
def hasInnerStruct(t: DataType): Boolean = t match {
- case s: StructType => false
+ case s: StructType => true
case ArrayType(t1, _) => hasInnerStruct(t1)
- case o => true
+ case o => false
}
- hasInnerStruct(elementType)
+ // Check if this array has inner struct.
+ !hasInnerStruct(elementType)
}
case struct: StructType => false
case _ => true
@@ -90,8 +92,11 @@ private[sql] object JsonRDD extends Logging {
}.map {
a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true)
}
+ val topLevelFieldNameSet = topLevelFields.map(_.name)
- val structFields: Seq[StructField] = structLike.groupBy(_(0)).map {
+ val structFields: Seq[StructField] = structLike.groupBy(_(0)).filter {
+ case (name, _) => !topLevelFieldNameSet.contains(name)
+ }.map {
case (name, fields) => {
val nestedFields = fields.map(_.tail)
val structType = makeStruct(nestedFields, prefix :+ name)
@@ -354,7 +359,8 @@ private[sql] object JsonRDD extends Logging {
case (key, value) =>
if (count > 0) builder.append(",")
count += 1
- builder.append(s"""\"${key}\":${toString(value)}""")
+ val stringValue = if (value.isInstanceOf[String]) s"""\"$value\"""" else toString(value)
+ builder.append(s"""\"${key}\":${stringValue}""")
}
builder.append("}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 362c7e1a52..4b851d1b96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -483,7 +483,8 @@ class JsonSuite extends QueryTest {
val expectedSchema = StructType(
StructField("array1", ArrayType(StringType, true), true) ::
StructField("array2", ArrayType(StructType(
- StructField("field", LongType, true) :: Nil), false), true) :: Nil)
+ StructField("field", LongType, true) :: Nil), false), true) ::
+ StructField("array3", ArrayType(StringType, false), true) :: Nil)
assert(expectedSchema === jsonSchemaRDD.schema)
@@ -492,12 +493,14 @@ class JsonSuite extends QueryTest {
checkAnswer(
sql("select * from jsonTable"),
Seq(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]",
- """{"field":str}"""), Seq(Seq(214748364700L), Seq(1))) :: Nil
+ """{"field":"str"}"""), Seq(Seq(214748364700L), Seq(1)), null) ::
+ Seq(null, null, Seq("""{"field":"str"}""", """{"field":1}""")) ::
+ Seq(null, null, Seq("1", "2", "3")) :: Nil
)
// Treat an element as a number.
checkAnswer(
- sql("select array1[0] + 1 from jsonTable"),
+ sql("select array1[0] + 1 from jsonTable where array1 is not null"),
2
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index c204162dd2..e5773a5587 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -57,7 +57,9 @@ object TestJsonData {
val arrayElementTypeConflict =
TestSQLContext.sparkContext.parallelize(
"""{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}],
- "array2": [{"field":214748364700}, {"field":1}]}""" :: Nil)
+ "array2": [{"field":214748364700}, {"field":1}]}""" ::
+ """{"array3": [{"field":"str"}, {"field":1}]}""" ::
+ """{"array3": [1, 2, 3]}""" :: Nil)
val missingFields =
TestSQLContext.sparkContext.parallelize(