diff options
author | Davies Liu <davies@databricks.com> | 2015-12-10 17:22:18 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-12-10 17:22:18 -0800 |
commit | b1b4ee7f3541d92c8bc2b0b4fdadf46cfdb09504 (patch) | |
tree | 20c10ff56731a32a1b43c5e68e1f20ca77a95115 | |
parent | 24d3357d66e14388faf8709b368edca70ea96432 (diff) | |
download | spark-b1b4ee7f3541d92c8bc2b0b4fdadf46cfdb09504.tar.gz spark-b1b4ee7f3541d92c8bc2b0b4fdadf46cfdb09504.tar.bz2 spark-b1b4ee7f3541d92c8bc2b0b4fdadf46cfdb09504.zip |
[SPARK-12258][SQL] passing null into ScalaUDF
Check nullability and passing them into ScalaUDF.
Closes #10249
Author: Davies Liu <davies@databricks.com>
Closes #10259 from davies/udf_null.
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala | 7 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 9 |
2 files changed, 10 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 03b89221ef..5deb2f81d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -1029,8 +1029,11 @@ case class ScalaUDF( // such as IntegerType, its javaType is `int` and the returned type of user-defined // function is Object. Trying to convert an Object to `int` will cause casting exception. val evalCode = evals.map(_.code).mkString - val funcArguments = converterTerms.zip(evals).map { - case (converter, eval) => s"$converter.apply(${eval.value})" + val funcArguments = converterTerms.zipWithIndex.map { + case (converter, i) => + val eval = evals(i) + val dt = children(i).dataType + s"$converter.apply(${eval.isNull} ? null : (${ctx.boxedType(dt)}) ${eval.value})" }.mkString(",") val callFunc = s"${ctx.boxedType(ctx.javaType(dataType))} $resultTerm = " + s"(${ctx.boxedType(ctx.javaType(dataType))})${catalystConverterTerm}" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 605a6549dd..8887dc68a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1138,14 +1138,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-11725: correctly handle null inputs for ScalaUDF") { - val df = Seq( + val df = sparkContext.parallelize(Seq( new java.lang.Integer(22) -> "John", - null.asInstanceOf[java.lang.Integer] -> "Lucy").toDF("age", "name") + null.asInstanceOf[java.lang.Integer] -> "Lucy")).toDF("age", "name") + // passing null into the UDF that could handle it val boxedUDF = udf[java.lang.Integer, java.lang.Integer] { - (i: java.lang.Integer) => if (i == null) null else i * 2 + (i: java.lang.Integer) => if (i == null) -10 else i * 2 } - checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(null) :: Nil) + checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(-10) :: Nil) val primitiveUDF = udf((i: Int) => i * 2) checkAnswer(df.select(primitiveUDF($"age")), Row(44) :: Row(null) :: Nil) |