aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpierre-borckmans <pierre.borckmans@realimpactanalytics.com>2015-12-22 23:00:42 -0800
committerReynold Xin <rxin@databricks.com>2015-12-22 23:00:42 -0800
commit43b2a6390087b7ce262a54dc8ab8dd825db62e21 (patch)
tree958bb0b86a5d040d4064d53786824274193cebd6
parent50301c0a28b64c5348b0f2c2d828589c0833c70c (diff)
downloadspark-43b2a6390087b7ce262a54dc8ab8dd825db62e21.tar.gz
spark-43b2a6390087b7ce262a54dc8ab8dd825db62e21.tar.bz2
spark-43b2a6390087b7ce262a54dc8ab8dd825db62e21.zip
[SPARK-12477][SQL] - Tungsten projection fails for null values in array fields
Accessing null elements in an array field fails when tungsten is enabled. It works in Spark 1.3.1, and in Spark > 1.5 with Tungsten disabled. This PR solves this by checking if the accessed element in the array field is null, in the generated code. Example: ``` // Array of String case class AS( as: Seq[String] ) val dfAS = sc.parallelize( Seq( AS ( Seq("a",null,"b") ) ) ).toDF dfAS.registerTempTable("T_AS") for (i <- 0 to 2) { println(i + " = " + sqlContext.sql(s"select as[$i] from T_AS").collect.mkString(","))} ``` With Tungsten disabled: ``` 0 = [a] 1 = [null] 2 = [b] ``` With Tungsten enabled: ``` 0 = [a] 15/12/22 09:32:50 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 15) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$UTF8StringWriter.getSize(UnsafeRowWriters.java:90) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:90) at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:88) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ``` Author: pierre-borckmans <pierre.borckmans@realimpactanalytics.com> Closes #10429 from pierre-borckmans/SPARK-12477_Tungsten-Projection-Null-Element-In-Array.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala9
2 files changed, 10 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
index c5ed173eeb..91c275b1aa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
@@ -227,7 +227,7 @@ case class GetArrayItem(child: Expression, ordinal: Expression)
nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
s"""
final int index = (int) $eval2;
- if (index >= $eval1.numElements() || index < 0) {
+ if (index >= $eval1.numElements() || index < 0 || $eval1.isNullAt(index)) {
${ev.isNull} = true;
} else {
${ev.value} = ${ctx.getValue(eval1, dataType, "index")};
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
index 09f7b50767..b76fc73b7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
@@ -43,4 +43,13 @@ class DataFrameComplexTypeSuite extends QueryTest with SharedSQLContext {
val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
df.select(array($"a").as("s")).select(f(expr("s[0]"))).collect()
}
+
+ test("SPARK-12477 accessing null element in array field") {
+ val df = sparkContext.parallelize(Seq((Seq("val1", null, "val2"),
+ Seq(Some(1), None, Some(2))))).toDF("s", "i")
+ val nullStringRow = df.selectExpr("s[1]").collect()(0)
+ assert(nullStringRow == org.apache.spark.sql.Row(null))
+ val nullIntRow = df.selectExpr("i[1]").collect()(0)
+ assert(nullIntRow == org.apache.spark.sql.Row(null))
+ }
}