From 4bc9e046cb8922923dff254e3e621fb4de656f98 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 11 Sep 2014 15:23:33 -0700 Subject: [SPARK-3390][SQL] sqlContext.jsonRDD fails on a complex structure of JSON array and JSON object nesting This PR aims to correctly handle JSON arrays in the type of `ArrayType(...(ArrayType(StructType)))`. JIRA: https://issues.apache.org/jira/browse/SPARK-3390. Author: Yin Huai Closes #2364 from yhuai/SPARK-3390 and squashes the following commits: 46db418 [Yin Huai] Handle JSON arrays in the type of ArrayType(...(ArrayType(StructType))). --- .../scala/org/apache/spark/sql/json/JsonRDD.scala | 66 +++++++++++++--------- 1 file changed, 39 insertions(+), 27 deletions(-) (limited to 'sql/core/src/main') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 70062eae3b..873221835d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -68,8 +68,15 @@ private[sql] object JsonRDD extends Logging { val (topLevel, structLike) = values.partition(_.size == 1) val topLevelFields = topLevel.filter { name => resolved.get(prefix ++ name).get match { - case ArrayType(StructType(Nil), _) => false - case ArrayType(_, _) => true + case ArrayType(elementType, _) => { + def hasInnerStruct(t: DataType): Boolean = t match { + case s: StructType => false + case ArrayType(t1, _) => hasInnerStruct(t1) + case o => true + } + + hasInnerStruct(elementType) + } case struct: StructType => false case _ => true } @@ -84,7 +91,18 @@ private[sql] object JsonRDD extends Logging { val dataType = resolved.get(prefix :+ name).get dataType match { case array: ArrayType => - Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true)) + // The pattern of this array is ArrayType(...(ArrayType(StructType))). + // Since the inner struct of array is a placeholder (StructType(Nil)), + // we need to replace this placeholder with the actual StructType (structType). + def getActualArrayType( + innerStruct: StructType, + currentArray: ArrayType): ArrayType = currentArray match { + case ArrayType(s: StructType, containsNull) => + ArrayType(innerStruct, containsNull) + case ArrayType(a: ArrayType, containsNull) => + ArrayType(getActualArrayType(innerStruct, a), containsNull) + } + Some(StructField(name, getActualArrayType(structType, array), nullable = true)) case struct: StructType => Some(StructField(name, structType, nullable = true)) // dataType is StringType means that we have resolved type conflicts involving // primitive types and complex types. So, the type of name has been relaxed to @@ -168,8 +186,7 @@ private[sql] object JsonRDD extends Logging { /** * Returns the element type of an JSON array. We go through all elements of this array * to detect any possible type conflict. We use [[compatibleType]] to resolve - * type conflicts. Right now, when the element of an array is another array, we - * treat the element as String. + * type conflicts. */ private def typeOfArray(l: Seq[Any]): ArrayType = { val containsNull = l.exists(v => v == null) @@ -216,18 +233,24 @@ private[sql] object JsonRDD extends Logging { } case (key: String, array: Seq[_]) => { // The value associated with the key is an array. - typeOfArray(array) match { + // Handle inner structs of an array. + def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match { case ArrayType(StructType(Nil), containsNull) => { // The elements of this arrays are structs. - array.asInstanceOf[Seq[Map[String, Any]]].flatMap { + v.asInstanceOf[Seq[Map[String, Any]]].flatMap { element => allKeysWithValueTypes(element) }.map { - case (k, dataType) => (s"$key.$k", dataType) - } :+ (key, ArrayType(StructType(Nil), containsNull)) + case (k, t) => (s"$key.$k", t) + } } - case ArrayType(elementType, containsNull) => - (key, ArrayType(elementType, containsNull)) :: Nil + case ArrayType(t1, containsNull) => + v.asInstanceOf[Seq[Any]].flatMap { + element => buildKeyPathForInnerStructs(element, t1) + } + case other => Nil } + val elementType = typeOfArray(array) + buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType) } case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil } @@ -339,8 +362,6 @@ private[sql] object JsonRDD extends Logging { null } else { desiredType match { - case ArrayType(elementType, _) => - value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) case StringType => toString(value) case IntegerType => value.asInstanceOf[IntegerType.JvmType] case LongType => toLong(value) @@ -348,6 +369,10 @@ private[sql] object JsonRDD extends Logging { case DecimalType => toDecimal(value) case BooleanType => value.asInstanceOf[BooleanType.JvmType] case NullType => null + + case ArrayType(elementType, _) => + value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) + case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct) } } } @@ -356,22 +381,9 @@ private[sql] object JsonRDD extends Logging { // TODO: Reuse the row instead of creating a new one for every record. val row = new GenericMutableRow(schema.fields.length) schema.fields.zipWithIndex.foreach { - // StructType - case (StructField(name, fields: StructType, _), i) => - row.update(i, json.get(name).flatMap(v => Option(v)).map( - v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull) - - // ArrayType(StructType) - case (StructField(name, ArrayType(structType: StructType, _), _), i) => - row.update(i, - json.get(name).flatMap(v => Option(v)).map( - v => v.asInstanceOf[Seq[Any]].map( - e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull) - - // Other cases case (StructField(name, dataType, _), i) => row.update(i, json.get(name).flatMap(v => Option(v)).map( - enforceCorrectType(_, dataType)).getOrElse(null)) + enforceCorrectType(_, dataType)).orNull) } row -- cgit v1.2.3