diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-04-02 19:34:38 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-02 19:34:38 -0700 |
commit | c2f25b1a148eeb1791ea7018b14b3a665c13212a (patch) | |
tree | 3bf23e7d59efa159164f1517b2ca04e68d580a5f /sql | |
parent | 1cf70183423b938ec064925b20fd4a5b9e355991 (diff) | |
download | spark-c2f25b1a148eeb1791ea7018b14b3a665c13212a.tar.gz spark-c2f25b1a148eeb1791ea7018b14b3a665c13212a.tar.bz2 spark-c2f25b1a148eeb1791ea7018b14b3a665c13212a.zip |
[SPARK-13996] [SQL] Add more not null attributes for Filter codegen
## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-13996
Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition if child.output.contains(a). However, the current approach to checking it is not comprehensive. We can improve it.
E.g., for this plan:
val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2")))
val schema = new StructType().add("k", IntegerType).add("v", StringType)
val smallDF = sqlContext.createDataFrame(rdd, schema)
val df = smallDF.filter("isnotnull(k + 1)")
The code snippet generated without this patch:
/* 031 */ protected void processNext() throws java.io.IOException {
/* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
/* 033 */
/* 034 */ /*** PRODUCE: INPUT */
/* 035 */
/* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) {
/* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */
/* 039 */ /* input[0, int] */
/* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
/* 042 */
/* 043 */ /* isnotnull((input[0, int] + 1)) */
/* 044 */ /* (input[0, int] + 1) */
/* 045 */ boolean filter_isNull3 = true;
/* 046 */ int filter_value3 = -1;
/* 047 */
/* 048 */ if (!filter_isNull) {
/* 049 */ filter_isNull3 = false; // resultCode could change nullability.
/* 050 */ filter_value3 = filter_value + 1;
/* 051 */
/* 052 */ }
/* 053 */ if (!(!(filter_isNull3))) continue;
/* 054 */
/* 055 */ filter_metricValue.add(1);
With this patch:
/* 031 */ protected void processNext() throws java.io.IOException {
/* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
/* 033 */
/* 034 */ /*** PRODUCE: INPUT */
/* 035 */
/* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) {
/* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 038 */ /*** CONSUME: Filter isnotnull((k#0 + 1)) */
/* 039 */ /* input[0, int] */
/* 040 */ boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 041 */ int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
/* 042 */
/* 043 */ if (filter_isNull) continue;
/* 044 */
/* 045 */ filter_metricValue.add(1);
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #11810 from viirya/add-more-not-null-attrs.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a6a14df6a3..fb1c6182cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -79,12 +79,12 @@ case class Filter(condition: Expression, child: SparkPlan) // Split out all the IsNotNulls from condition. private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { - case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true + case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true case _ => false } // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = notNullPreds.flatMap(_.references) + private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate // all the variables at the beginning to take advantage of short circuiting. @@ -92,7 +92,7 @@ case class Filter(condition: Expression, child: SparkPlan) override def output: Seq[Attribute] = { child.output.map { a => - if (a.nullable && notNullAttributes.exists(_.semanticEquals(a))) { + if (a.nullable && notNullAttributes.contains(a.exprId)) { a.withNullability(false) } else { a @@ -179,7 +179,7 @@ case class Filter(condition: Expression, child: SparkPlan) // Reset the isNull to false for the not-null columns, then the followed operators could // generate better code (remove dead branches). val resultVars = input.zipWithIndex.map { case (ev, i) => - if (notNullAttributes.exists(_.semanticEquals(child.output(i)))) { + if (notNullAttributes.contains(child.output(i).exprId)) { ev.isNull = "false" } ev |