diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-06-30 08:15:08 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-06-30 08:15:08 +0800 |
commit | d063898bebaaf4ec2aad24c3ac70aabdbf97a190 (patch) | |
tree | e10e1ed9765961338730237002e1b78c3f1f184b /sql/core | |
parent | 2eaabfa4142d4050be2b45fd277ff5c7fa430581 (diff) | |
download | spark-d063898bebaaf4ec2aad24c3ac70aabdbf97a190.tar.gz spark-d063898bebaaf4ec2aad24c3ac70aabdbf97a190.tar.bz2 spark-d063898bebaaf4ec2aad24c3ac70aabdbf97a190.zip |
[SPARK-16134][SQL] optimizer rules for typed filter
## What changes were proposed in this pull request?
This PR adds 3 optimizer rules for typed filter:
1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition.
2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition.
3. combine adjacent typed filters and share the deserialized object among all the condition expressions.
This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules.
## How was this patch tested?
`TypedFilterOptimizationSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes #13846 from cloud-fan/filter.
Diffstat (limited to 'sql/core')
3 files changed, 5 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a6581eb563..e64669a19c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1997,11 +1997,7 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: T => Boolean): Dataset[T] = { - val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) - val function = Literal.create(func, ObjectType(classOf[T => Boolean])) - val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil) - val filter = Filter(condition, logicalPlan) - withTypedPlan(filter) + withTypedPlan(TypedFilter(func, logicalPlan)) } /** @@ -2014,11 +2010,7 @@ class Dataset[T] private[sql]( */ @Experimental def filter(func: FilterFunction[T]): Dataset[T] = { - val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer) - val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]])) - val condition = Invoke(function, "call", BooleanType, deserializer :: Nil) - val filter = Filter(condition, logicalPlan) - withTypedPlan(filter) + withTypedPlan(TypedFilter(func, logicalPlan)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b619d4edc3..5e643ea75a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -385,6 +385,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.ProjectExec(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => execution.FilterExec(condition, planLater(child)) :: Nil + case f: logical.TypedFilter => + execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil case e @ logical.Expand(_, _, child) => execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil case logical.Window(windowExprs, partitionSpec, orderSpec, child) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index b15f38c2a7..ab505139a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -238,6 +238,7 @@ abstract class QueryTest extends PlanTest { case _: ObjectConsumer => return case _: ObjectProducer => return case _: AppendColumns => return + case _: TypedFilter => return case _: LogicalRelation => return case p if p.getClass.getSimpleName == "MetastoreRelation" => return case _: MemoryPlan => return |