aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-01-19 10:44:12 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-19 10:44:12 -0800
commitcd5da428537b8dfa0bbb9592d344316c26d8f625 (patch)
treeb3ed99e099e7e9cb3133387f1840938ea08d10ea /sql
parent4432568aac1d4a44fa1a7c3469f095eb7a6ce945 (diff)
downloadspark-cd5da428537b8dfa0bbb9592d344316c26d8f625.tar.gz
spark-cd5da428537b8dfa0bbb9592d344316c26d8f625.tar.bz2
spark-cd5da428537b8dfa0bbb9592d344316c26d8f625.zip
[SPARK-5284][SQL] Insert into Hive throws NPE when a inner complex type field has a null value
JIRA: https://issues.apache.org/jira/browse/SPARK-5284 Author: Yin Huai <yhuai@databricks.com> Closes #4077 from yhuai/SPARK-5284 and squashes the following commits: fceacd6 [Yin Huai] Check if a value is null when the field has a complex type.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala26
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala37
2 files changed, 54 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index d87c4945c8..eeabfdd857 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -346,16 +346,20 @@ private[hive] trait HiveInspectors {
case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
(o: Any) => {
- val struct = soi.create()
- (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
- (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
+ if (o != null) {
+ val struct = soi.create()
+ (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
+ (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
+ }
+ struct
+ } else {
+ null
}
- struct
}
case loi: ListObjectInspector =>
val wrapper = wrapperFor(loi.getListElementObjectInspector)
- (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
+ (o: Any) => if (o != null) seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) else null
case moi: MapObjectInspector =>
// The Predef.Map is scala.collection.immutable.Map.
@@ -364,9 +368,15 @@ private[hive] trait HiveInspectors {
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
- (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
- keyWrapper(key) -> valueWrapper(value)
- })
+ (o: Any) => {
+ if (o != null) {
+ mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
+ keyWrapper(key) -> valueWrapper(value)
+ })
+ } else {
+ null
+ }
+ }
case _ =>
identity[Any]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c1c3683f84..d41eb9e870 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.types._
case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
@@ -214,4 +214,39 @@ class SQLQuerySuite extends QueryTest {
Seq.empty[Row])
}
}
+
+ test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") {
+ val schema = StructType(
+ StructField("s",
+ StructType(
+ StructField("innerStruct", StructType(StructField("s1", StringType, true) :: Nil)) ::
+ StructField("innerArray", ArrayType(IntegerType), true) ::
+ StructField("innerMap", MapType(StringType, IntegerType)) :: Nil), true) :: Nil)
+ val row = Row(Row(null, null, null))
+
+ val rowRdd = sparkContext.parallelize(row :: Nil)
+
+ applySchema(rowRdd, schema).registerTempTable("testTable")
+
+ sql(
+ """CREATE TABLE nullValuesInInnerComplexTypes
+ | (s struct<innerStruct: struct<s1:string>,
+ | innerArray:array<int>,
+ | innerMap: map<string, int>>)
+ """.stripMargin).collect
+
+ sql(
+ """
+ |INSERT OVERWRITE TABLE nullValuesInInnerComplexTypes
+ |SELECT * FROM testTable
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM nullValuesInInnerComplexTypes"),
+ Seq(Seq(Seq(null, null, null)))
+ )
+
+ sql("DROP TABLE nullValuesInInnerComplexTypes")
+ dropTempTable("testTable")
+ }
}