From c2f25b1a148eeb1791ea7018b14b3a665c13212a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 2 Apr 2016 19:34:38 -0700 Subject: [SPARK-13996] [SQL] Add more not null attributes for Filter codegen ## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13996 Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition if child.output.contains(a). However, the current approach to checking it is not comprehensive. We can improve it. E.g., for this plan: val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2"))) val schema = new StructType().add("k", IntegerType).add("v", StringType) val smallDF = sqlContext.createDataFrame(rdd, schema) val df = smallDF.filter("isnotnull(k + 1)") The code snippet generated without this patch: /* 031 */ protected void processNext() throws java.io.IOException { /* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */ /* 033 */ /* 034 */ /*** PRODUCE: INPUT */ /* 035 */ /* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */ /* 039 */ /* input[0, int] */ /* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0); /* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0)); /* 042 */ /* 043 */ /* isnotnull((input[0, int] + 1)) */ /* 044 */ /* (input[0, int] + 1) */ /* 045 */ boolean filter_isNull3 = true; /* 046 */ int filter_value3 = -1; /* 047 */ /* 048 */ if (!filter_isNull) { /* 049 */ filter_isNull3 = false; // resultCode could change nullability. /* 050 */ filter_value3 = filter_value + 1; /* 051 */ /* 052 */ } /* 053 */ if (!(!(filter_isNull3))) continue; /* 054 */ /* 055 */ filter_metricValue.add(1); With this patch: /* 031 */ protected void processNext() throws java.io.IOException { /* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */ /* 033 */ /* 034 */ /*** PRODUCE: INPUT */ /* 035 */ /* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */ /* 039 */ /* input[0, int] */ /* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0); /* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0)); /* 042 */ /* 043 */ if (filter_isNull) continue; /* 044 */ /* 045 */ filter_metricValue.add(1); ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #11810 from viirya/add-more-not-null-attrs. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a6a14df6a3..fb1c6182cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -79,12 +79,12 @@ case class Filter(condition: Expression, child: SparkPlan) // Split out all the IsNotNulls from condition. private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { - case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true + case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true case _ => false } // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = notNullPreds.flatMap(_.references) + private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate // all the variables at the beginning to take advantage of short circuiting. @@ -92,7 +92,7 @@ case class Filter(condition: Expression, child: SparkPlan) override def output: Seq[Attribute] = { child.output.map { a => - if (a.nullable && notNullAttributes.exists(_.semanticEquals(a))) { + if (a.nullable && notNullAttributes.contains(a.exprId)) { a.withNullability(false) } else { a @@ -179,7 +179,7 @@ case class Filter(condition: Expression, child: SparkPlan) // Reset the isNull to false for the not-null columns, then the followed operators could // generate better code (remove dead branches). val resultVars = input.zipWithIndex.map { case (ev, i) => - if (notNullAttributes.exists(_.semanticEquals(child.output(i)))) { + if (notNullAttributes.contains(child.output(i).exprId)) { ev.isNull = "false" } ev -- cgit v1.2.3