From cc0caa690b32246b076c699ea3f8d8a84797fb94 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 26 Aug 2016 21:41:58 -0700 Subject: [SPARK-17270][SQL] Move object optimization rules into its own file ## What changes were proposed in this pull request? As part of breaking Optimizer.scala apart, this patch moves various Dataset object optimization rules into a single file. I'm submitting separate pull requests so we can more easily merge this in branch-2.0 to simplify optimizer backports. ## How was this patch tested? This should be covered by existing tests. Author: Reynold Xin Closes #14839 from rxin/SPARK-17270. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 71 ---------------- .../spark/sql/catalyst/optimizer/objects.scala | 98 ++++++++++++++++++++++ 2 files changed, 98 insertions(+), 71 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5c8316189d..7bbcd742b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -201,43 +201,6 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] { } } -/** - * Removes cases where we are unnecessarily going between the object and serialized (InternalRow) - * representation of data item. For example back to back map operations. - */ -object EliminateSerialization extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case d @ DeserializeToObject(_, _, s: SerializeFromObject) - if d.outputObjAttr.dataType == s.inputObjAttr.dataType => - // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. - // We will remove it later in RemoveAliasOnlyProject rule. - val objAttr = Alias(s.inputObjAttr, s.inputObjAttr.name)(exprId = d.outputObjAttr.exprId) - Project(objAttr :: Nil, s.child) - - case a @ AppendColumns(_, _, _, _, _, s: SerializeFromObject) - if a.deserializer.dataType == s.inputObjAttr.dataType => - AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child) - - // If there is a `SerializeFromObject` under typed filter and its input object type is same with - // the typed filter's deserializer, we can convert typed filter to normal filter without - // deserialization in condition, and push it down through `SerializeFromObject`. - // e.g. `ds.map(...).filter(...)` can be optimized by this rule to save extra deserialization, - // but `ds.map(...).as[AnotherType].filter(...)` can not be optimized. - case f @ TypedFilter(_, _, _, _, s: SerializeFromObject) - if f.deserializer.dataType == s.inputObjAttr.dataType => - s.copy(child = f.withObjectProducerChild(s.child)) - - // If there is a `DeserializeToObject` upon typed filter and its output object type is same with - // the typed filter's deserializer, we can convert typed filter to normal filter without - // deserialization in condition, and pull it up through `DeserializeToObject`. - // e.g. `ds.filter(...).map(...)` can be optimized by this rule to save extra deserialization, - // but `ds.filter(...).as[AnotherType].map(...)` can not be optimized. - case d @ DeserializeToObject(_, _, f: TypedFilter) - if d.outputObjAttr.dataType == f.deserializer.dataType => - f.withObjectProducerChild(d.copy(child = f.child)) - } -} - /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ @@ -1713,40 +1676,6 @@ case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[Logic } } -/** - * Combines two adjacent [[TypedFilter]]s, which operate on same type object in condition, into one, - * mering the filter functions into one conjunctive function. - */ -object CombineTypedFilters extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case t1 @ TypedFilter(_, _, _, _, t2 @ TypedFilter(_, _, _, _, child)) - if t1.deserializer.dataType == t2.deserializer.dataType => - TypedFilter( - combineFilterFunction(t2.func, t1.func), - t1.argumentClass, - t1.argumentSchema, - t1.deserializer, - child) - } - - private def combineFilterFunction(func1: AnyRef, func2: AnyRef): Any => Boolean = { - (func1, func2) match { - case (f1: FilterFunction[_], f2: FilterFunction[_]) => - input => f1.asInstanceOf[FilterFunction[Any]].call(input) && - f2.asInstanceOf[FilterFunction[Any]].call(input) - case (f1: FilterFunction[_], f2) => - input => f1.asInstanceOf[FilterFunction[Any]].call(input) && - f2.asInstanceOf[Any => Boolean](input) - case (f1, f2: FilterFunction[_]) => - input => f1.asInstanceOf[Any => Boolean].apply(input) && - f2.asInstanceOf[FilterFunction[Any]].call(input) - case (f1, f2) => - input => f1.asInstanceOf[Any => Boolean].apply(input) && - f2.asInstanceOf[Any => Boolean].apply(input) - } - } -} - /** * This rule rewrites predicate sub-queries into left semi/anti joins. The following predicates * are supported: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala new file mode 100644 index 0000000000..174d546e22 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.api.java.function.FilterFunction +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +/* + * This file defines optimization rules related to object manipulation (for the Dataset API). + */ + +/** + * Removes cases where we are unnecessarily going between the object and serialized (InternalRow) + * representation of data item. For example back to back map operations. + */ +object EliminateSerialization extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case d @ DeserializeToObject(_, _, s: SerializeFromObject) + if d.outputObjAttr.dataType == s.inputObjAttr.dataType => + // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. + // We will remove it later in RemoveAliasOnlyProject rule. + val objAttr = Alias(s.inputObjAttr, s.inputObjAttr.name)(exprId = d.outputObjAttr.exprId) + Project(objAttr :: Nil, s.child) + + case a @ AppendColumns(_, _, _, _, _, s: SerializeFromObject) + if a.deserializer.dataType == s.inputObjAttr.dataType => + AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child) + + // If there is a `SerializeFromObject` under typed filter and its input object type is same with + // the typed filter's deserializer, we can convert typed filter to normal filter without + // deserialization in condition, and push it down through `SerializeFromObject`. + // e.g. `ds.map(...).filter(...)` can be optimized by this rule to save extra deserialization, + // but `ds.map(...).as[AnotherType].filter(...)` can not be optimized. + case f @ TypedFilter(_, _, _, _, s: SerializeFromObject) + if f.deserializer.dataType == s.inputObjAttr.dataType => + s.copy(child = f.withObjectProducerChild(s.child)) + + // If there is a `DeserializeToObject` upon typed filter and its output object type is same with + // the typed filter's deserializer, we can convert typed filter to normal filter without + // deserialization in condition, and pull it up through `DeserializeToObject`. + // e.g. `ds.filter(...).map(...)` can be optimized by this rule to save extra deserialization, + // but `ds.filter(...).as[AnotherType].map(...)` can not be optimized. + case d @ DeserializeToObject(_, _, f: TypedFilter) + if d.outputObjAttr.dataType == f.deserializer.dataType => + f.withObjectProducerChild(d.copy(child = f.child)) + } +} + +/** + * Combines two adjacent [[TypedFilter]]s, which operate on same type object in condition, into one, + * mering the filter functions into one conjunctive function. + */ +object CombineTypedFilters extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case t1 @ TypedFilter(_, _, _, _, t2 @ TypedFilter(_, _, _, _, child)) + if t1.deserializer.dataType == t2.deserializer.dataType => + TypedFilter( + combineFilterFunction(t2.func, t1.func), + t1.argumentClass, + t1.argumentSchema, + t1.deserializer, + child) + } + + private def combineFilterFunction(func1: AnyRef, func2: AnyRef): Any => Boolean = { + (func1, func2) match { + case (f1: FilterFunction[_], f2: FilterFunction[_]) => + input => f1.asInstanceOf[FilterFunction[Any]].call(input) && + f2.asInstanceOf[FilterFunction[Any]].call(input) + case (f1: FilterFunction[_], f2) => + input => f1.asInstanceOf[FilterFunction[Any]].call(input) && + f2.asInstanceOf[Any => Boolean](input) + case (f1, f2: FilterFunction[_]) => + input => f1.asInstanceOf[Any => Boolean].apply(input) && + f2.asInstanceOf[FilterFunction[Any]].call(input) + case (f1, f2) => + input => f1.asInstanceOf[Any => Boolean].apply(input) && + f2.asInstanceOf[Any => Boolean].apply(input) + } + } +} -- cgit v1.2.3