diff options
author | Yin Huai <yhuai@databricks.com> | 2015-01-19 10:44:12 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-01-19 10:44:12 -0800 |
commit | cd5da428537b8dfa0bbb9592d344316c26d8f625 (patch) | |
tree | b3ed99e099e7e9cb3133387f1840938ea08d10ea | |
parent | 4432568aac1d4a44fa1a7c3469f095eb7a6ce945 (diff) | |
download | spark-cd5da428537b8dfa0bbb9592d344316c26d8f625.tar.gz spark-cd5da428537b8dfa0bbb9592d344316c26d8f625.tar.bz2 spark-cd5da428537b8dfa0bbb9592d344316c26d8f625.zip |
[SPARK-5284][SQL] Insert into Hive throws NPE when a inner complex type field has a null value
JIRA: https://issues.apache.org/jira/browse/SPARK-5284
Author: Yin Huai <yhuai@databricks.com>
Closes #4077 from yhuai/SPARK-5284 and squashes the following commits:
fceacd6 [Yin Huai] Check if a value is null when the field has a complex type.
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 26 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 37 |
2 files changed, 54 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index d87c4945c8..eeabfdd857 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -346,16 +346,20 @@ private[hive] trait HiveInspectors { case soi: StandardStructObjectInspector => val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) (o: Any) => { - val struct = soi.create() - (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach { - (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data)) + if (o != null) { + val struct = soi.create() + (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach { + (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data)) + } + struct + } else { + null } - struct } case loi: ListObjectInspector => val wrapper = wrapperFor(loi.getListElementObjectInspector) - (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) + (o: Any) => if (o != null) seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) else null case moi: MapObjectInspector => // The Predef.Map is scala.collection.immutable.Map. @@ -364,9 +368,15 @@ private[hive] trait HiveInspectors { val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector) val valueWrapper = wrapperFor(moi.getMapValueObjectInspector) - (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => - keyWrapper(key) -> valueWrapper(value) - }) + (o: Any) => { + if (o != null) { + mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => + keyWrapper(key) -> valueWrapper(value) + }) + } else { + null + } + } case _ => identity[Any] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index c1c3683f84..d41eb9e870 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.util.Utils +import org.apache.spark.sql.types._ case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -214,4 +214,39 @@ class SQLQuerySuite extends QueryTest { Seq.empty[Row]) } } + + test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") { + val schema = StructType( + StructField("s", + StructType( + StructField("innerStruct", StructType(StructField("s1", StringType, true) :: Nil)) :: + StructField("innerArray", ArrayType(IntegerType), true) :: + StructField("innerMap", MapType(StringType, IntegerType)) :: Nil), true) :: Nil) + val row = Row(Row(null, null, null)) + + val rowRdd = sparkContext.parallelize(row :: Nil) + + applySchema(rowRdd, schema).registerTempTable("testTable") + + sql( + """CREATE TABLE nullValuesInInnerComplexTypes + | (s struct<innerStruct: struct<s1:string>, + | innerArray:array<int>, + | innerMap: map<string, int>>) + """.stripMargin).collect + + sql( + """ + |INSERT OVERWRITE TABLE nullValuesInInnerComplexTypes + |SELECT * FROM testTable + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM nullValuesInInnerComplexTypes"), + Seq(Seq(Seq(null, null, null))) + ) + + sql("DROP TABLE nullValuesInInnerComplexTypes") + dropTempTable("testTable") + } } |