aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-04-02 19:34:38 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-02 19:34:38 -0700
commitc2f25b1a148eeb1791ea7018b14b3a665c13212a (patch)
tree3bf23e7d59efa159164f1517b2ca04e68d580a5f /sql
parent1cf70183423b938ec064925b20fd4a5b9e355991 (diff)
downloadspark-c2f25b1a148eeb1791ea7018b14b3a665c13212a.tar.gz
spark-c2f25b1a148eeb1791ea7018b14b3a665c13212a.tar.bz2
spark-c2f25b1a148eeb1791ea7018b14b3a665c13212a.zip
[SPARK-13996] [SQL] Add more not null attributes for Filter codegen
## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13996 Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition if child.output.contains(a). However, the current approach to checking it is not comprehensive. We can improve it. E.g., for this plan: val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2"))) val schema = new StructType().add("k", IntegerType).add("v", StringType) val smallDF = sqlContext.createDataFrame(rdd, schema) val df = smallDF.filter("isnotnull(k + 1)") The code snippet generated without this patch: /* 031 */ protected void processNext() throws java.io.IOException { /* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */ /* 033 */ /* 034 */ /*** PRODUCE: INPUT */ /* 035 */ /* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */ /* 039 */ /* input[0, int] */ /* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0); /* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0)); /* 042 */ /* 043 */ /* isnotnull((input[0, int] + 1)) */ /* 044 */ /* (input[0, int] + 1) */ /* 045 */ boolean filter_isNull3 = true; /* 046 */ int filter_value3 = -1; /* 047 */ /* 048 */ if (!filter_isNull) { /* 049 */ filter_isNull3 = false; // resultCode could change nullability. /* 050 */ filter_value3 = filter_value + 1; /* 051 */ /* 052 */ } /* 053 */ if (!(!(filter_isNull3))) continue; /* 054 */ /* 055 */ filter_metricValue.add(1); With this patch: /* 031 */ protected void processNext() throws java.io.IOException { /* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */ /* 033 */ /* 034 */ /*** PRODUCE: INPUT */ /* 035 */ /* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) { /* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */ /* 039 */ /* input[0, int] */ /* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0); /* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0)); /* 042 */ /* 043 */ if (filter_isNull) continue; /* 044 */ /* 045 */ filter_metricValue.add(1); ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #11810 from viirya/add-more-not-null-attrs.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a6a14df6a3..fb1c6182cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -79,12 +79,12 @@ case class Filter(condition: Expression, child: SparkPlan)
// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
- case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true
+ case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true
case _ => false
}
// The columns that will filtered out by `IsNotNull` could be considered as not nullable.
- private val notNullAttributes = notNullPreds.flatMap(_.references)
+ private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId)
// Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate
// all the variables at the beginning to take advantage of short circuiting.
@@ -92,7 +92,7 @@ case class Filter(condition: Expression, child: SparkPlan)
override def output: Seq[Attribute] = {
child.output.map { a =>
- if (a.nullable && notNullAttributes.exists(_.semanticEquals(a))) {
+ if (a.nullable && notNullAttributes.contains(a.exprId)) {
a.withNullability(false)
} else {
a
@@ -179,7 +179,7 @@ case class Filter(condition: Expression, child: SparkPlan)
// Reset the isNull to false for the not-null columns, then the followed operators could
// generate better code (remove dead branches).
val resultVars = input.zipWithIndex.map { case (ev, i) =>
- if (notNullAttributes.exists(_.semanticEquals(child.output(i)))) {
+ if (notNullAttributes.contains(child.output(i).exprId)) {
ev.isNull = "false"
}
ev