aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-06-30 08:15:08 +0800
committerCheng Lian <lian@databricks.com>2016-06-30 08:15:08 +0800
commitd063898bebaaf4ec2aad24c3ac70aabdbf97a190 (patch)
treee10e1ed9765961338730237002e1b78c3f1f184b /sql/core
parent2eaabfa4142d4050be2b45fd277ff5c7fa430581 (diff)
downloadspark-d063898bebaaf4ec2aad24c3ac70aabdbf97a190.tar.gz
spark-d063898bebaaf4ec2aad24c3ac70aabdbf97a190.tar.bz2
spark-d063898bebaaf4ec2aad24c3ac70aabdbf97a190.zip
[SPARK-16134][SQL] optimizer rules for typed filter
## What changes were proposed in this pull request? This PR adds 3 optimizer rules for typed filter: 1. push typed filter down through `SerializeFromObject` and eliminate the deserialization in filter condition. 2. pull typed filter up through `SerializeFromObject` and eliminate the deserialization in filter condition. 3. combine adjacent typed filters and share the deserialized object among all the condition expressions. This PR also adds `TypedFilter` logical plan, to separate it from normal filter, so that the concept is more clear and it's easier to write optimizer rules. ## How was this patch tested? `TypedFilterOptimizationSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #13846 from cloud-fan/filter.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala1
3 files changed, 5 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a6581eb563..e64669a19c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1997,11 +1997,7 @@ class Dataset[T] private[sql](
*/
@Experimental
def filter(func: T => Boolean): Dataset[T] = {
- val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
- val function = Literal.create(func, ObjectType(classOf[T => Boolean]))
- val condition = Invoke(function, "apply", BooleanType, deserializer :: Nil)
- val filter = Filter(condition, logicalPlan)
- withTypedPlan(filter)
+ withTypedPlan(TypedFilter(func, logicalPlan))
}
/**
@@ -2014,11 +2010,7 @@ class Dataset[T] private[sql](
*/
@Experimental
def filter(func: FilterFunction[T]): Dataset[T] = {
- val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
- val function = Literal.create(func, ObjectType(classOf[FilterFunction[T]]))
- val condition = Invoke(function, "call", BooleanType, deserializer :: Nil)
- val filter = Filter(condition, logicalPlan)
- withTypedPlan(filter)
+ withTypedPlan(TypedFilter(func, logicalPlan))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b619d4edc3..5e643ea75a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -385,6 +385,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ProjectExec(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.FilterExec(condition, planLater(child)) :: Nil
+ case f: logical.TypedFilter =>
+ execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index b15f38c2a7..ab505139a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -238,6 +238,7 @@ abstract class QueryTest extends PlanTest {
case _: ObjectConsumer => return
case _: ObjectProducer => return
case _: AppendColumns => return
+ case _: TypedFilter => return
case _: LogicalRelation => return
case p if p.getClass.getSimpleName == "MetastoreRelation" => return
case _: MemoryPlan => return