aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-09-11 15:23:33 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-11 15:23:33 -0700
commit4bc9e046cb8922923dff254e3e621fb4de656f98 (patch)
tree91cff2d01fc277c8c50da7f66795caf3e1b9cd7c /sql/core/src/main
parentca83f1e2c4dfa519e44b837b6815cba3b4526d92 (diff)
downloadspark-4bc9e046cb8922923dff254e3e621fb4de656f98.tar.gz
spark-4bc9e046cb8922923dff254e3e621fb4de656f98.tar.bz2
spark-4bc9e046cb8922923dff254e3e621fb4de656f98.zip
[SPARK-3390][SQL] sqlContext.jsonRDD fails on a complex structure of JSON array and JSON object nesting
This PR aims to correctly handle JSON arrays in the type of `ArrayType(...(ArrayType(StructType)))`. JIRA: https://issues.apache.org/jira/browse/SPARK-3390. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #2364 from yhuai/SPARK-3390 and squashes the following commits: 46db418 [Yin Huai] Handle JSON arrays in the type of ArrayType(...(ArrayType(StructType))).
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala66
1 files changed, 39 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 70062eae3b..873221835d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -68,8 +68,15 @@ private[sql] object JsonRDD extends Logging {
val (topLevel, structLike) = values.partition(_.size == 1)
val topLevelFields = topLevel.filter {
name => resolved.get(prefix ++ name).get match {
- case ArrayType(StructType(Nil), _) => false
- case ArrayType(_, _) => true
+ case ArrayType(elementType, _) => {
+ def hasInnerStruct(t: DataType): Boolean = t match {
+ case s: StructType => false
+ case ArrayType(t1, _) => hasInnerStruct(t1)
+ case o => true
+ }
+
+ hasInnerStruct(elementType)
+ }
case struct: StructType => false
case _ => true
}
@@ -84,7 +91,18 @@ private[sql] object JsonRDD extends Logging {
val dataType = resolved.get(prefix :+ name).get
dataType match {
case array: ArrayType =>
- Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true))
+ // The pattern of this array is ArrayType(...(ArrayType(StructType))).
+ // Since the inner struct of array is a placeholder (StructType(Nil)),
+ // we need to replace this placeholder with the actual StructType (structType).
+ def getActualArrayType(
+ innerStruct: StructType,
+ currentArray: ArrayType): ArrayType = currentArray match {
+ case ArrayType(s: StructType, containsNull) =>
+ ArrayType(innerStruct, containsNull)
+ case ArrayType(a: ArrayType, containsNull) =>
+ ArrayType(getActualArrayType(innerStruct, a), containsNull)
+ }
+ Some(StructField(name, getActualArrayType(structType, array), nullable = true))
case struct: StructType => Some(StructField(name, structType, nullable = true))
// dataType is StringType means that we have resolved type conflicts involving
// primitive types and complex types. So, the type of name has been relaxed to
@@ -168,8 +186,7 @@ private[sql] object JsonRDD extends Logging {
/**
* Returns the element type of an JSON array. We go through all elements of this array
* to detect any possible type conflict. We use [[compatibleType]] to resolve
- * type conflicts. Right now, when the element of an array is another array, we
- * treat the element as String.
+ * type conflicts.
*/
private def typeOfArray(l: Seq[Any]): ArrayType = {
val containsNull = l.exists(v => v == null)
@@ -216,18 +233,24 @@ private[sql] object JsonRDD extends Logging {
}
case (key: String, array: Seq[_]) => {
// The value associated with the key is an array.
- typeOfArray(array) match {
+ // Handle inner structs of an array.
+ def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match {
case ArrayType(StructType(Nil), containsNull) => {
// The elements of this arrays are structs.
- array.asInstanceOf[Seq[Map[String, Any]]].flatMap {
+ v.asInstanceOf[Seq[Map[String, Any]]].flatMap {
element => allKeysWithValueTypes(element)
}.map {
- case (k, dataType) => (s"$key.$k", dataType)
- } :+ (key, ArrayType(StructType(Nil), containsNull))
+ case (k, t) => (s"$key.$k", t)
+ }
}
- case ArrayType(elementType, containsNull) =>
- (key, ArrayType(elementType, containsNull)) :: Nil
+ case ArrayType(t1, containsNull) =>
+ v.asInstanceOf[Seq[Any]].flatMap {
+ element => buildKeyPathForInnerStructs(element, t1)
+ }
+ case other => Nil
}
+ val elementType = typeOfArray(array)
+ buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType)
}
case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
}
@@ -339,8 +362,6 @@ private[sql] object JsonRDD extends Logging {
null
} else {
desiredType match {
- case ArrayType(elementType, _) =>
- value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
case StringType => toString(value)
case IntegerType => value.asInstanceOf[IntegerType.JvmType]
case LongType => toLong(value)
@@ -348,6 +369,10 @@ private[sql] object JsonRDD extends Logging {
case DecimalType => toDecimal(value)
case BooleanType => value.asInstanceOf[BooleanType.JvmType]
case NullType => null
+
+ case ArrayType(elementType, _) =>
+ value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+ case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
}
}
}
@@ -356,22 +381,9 @@ private[sql] object JsonRDD extends Logging {
// TODO: Reuse the row instead of creating a new one for every record.
val row = new GenericMutableRow(schema.fields.length)
schema.fields.zipWithIndex.foreach {
- // StructType
- case (StructField(name, fields: StructType, _), i) =>
- row.update(i, json.get(name).flatMap(v => Option(v)).map(
- v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
-
- // ArrayType(StructType)
- case (StructField(name, ArrayType(structType: StructType, _), _), i) =>
- row.update(i,
- json.get(name).flatMap(v => Option(v)).map(
- v => v.asInstanceOf[Seq[Any]].map(
- e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
-
- // Other cases
case (StructField(name, dataType, _), i) =>
row.update(i, json.get(name).flatMap(v => Option(v)).map(
- enforceCorrectType(_, dataType)).getOrElse(null))
+ enforceCorrectType(_, dataType)).orNull)
}
row