From 6a1d48f4f02c4498b64439c3dd5f671286a90e30 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 3 Oct 2014 12:34:27 -0700 Subject: [SPARK-3212][SQL] Use logical plan matching instead of temporary tables for table caching _Also addresses: SPARK-1671, SPARK-1379 and SPARK-3641_ This PR introduces a new trait, `CacheManger`, which replaces the previous temporary table based caching system. Instead of creating a temporary table that shadows an existing table with and equivalent cached representation, the cached manager maintains a separate list of logical plans and their cached data. After optimization, this list is searched for any matching plan fragments. When a matching plan fragment is found it is replaced with the cached data. There are several advantages to this approach: - Calling .cache() on a SchemaRDD now works as you would expect, and uses the more efficient columnar representation. - Its now possible to provide a list of temporary tables, without having to decide if a given table is actually just a cached persistent table. (To be done in a follow-up PR) - In some cases it is possible that cached data will be used, even if a cached table was not explicitly requested. This is because we now look at the logical structure instead of the table name. - We now correctly invalidate when data is inserted into a hive table. Author: Michael Armbrust Closes #2501 from marmbrus/caching and squashes the following commits: 63fbc2c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching. 0ea889e [Michael Armbrust] Address comments. 1e23287 [Michael Armbrust] Add support for cache invalidation for hive inserts. 65ed04a [Michael Armbrust] fix tests. bdf9a3f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching b4b77f2 [Michael Armbrust] Address comments 6923c9d [Michael Armbrust] More comments / tests 80f26ac [Michael Armbrust] First draft of improved semantics for Spark SQL caching. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++ .../catalyst/expressions/namedExpressions.scala | 4 +-- .../sql/catalyst/plans/logical/LogicalPlan.scala | 42 ++++++++++++++++++++++ .../sql/catalyst/plans/logical/TestRelation.scala | 6 ++++ .../catalyst/plans/logical/basicOperators.scala | 4 +-- 5 files changed, 55 insertions(+), 4 deletions(-) (limited to 'sql/catalyst/src/main/scala/org/apache') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 71810b798b..fe83eb1250 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -93,6 +93,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool */ object ResolveRelations extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) => + i.copy( + table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias))) case UnresolvedRelation(databaseName, name, alias) => catalog.lookupRelation(databaseName, name, alias) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 59fb0311a9..e5a958d599 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -62,7 +62,7 @@ abstract class Attribute extends NamedExpression { def withName(newName: String): Attribute def toAttribute = this - def newInstance: Attribute + def newInstance(): Attribute } @@ -131,7 +131,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea h } - override def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers) + override def newInstance() = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers) /** * Returns a copy of this [[AttributeReference]] with changed nullability. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 28d863e58b..4f8ad8a7e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.trees @@ -72,6 +73,47 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { */ def childrenResolved: Boolean = !children.exists(!_.resolved) + /** + * Returns true when the given logical plan will return the same results as this logical plan. + * + * Since its likely undecideable to generally determine if two given plans will produce the same + * results, it is okay for this function to return false, even if the results are actually + * the same. Such behavior will not affect correctness, only the application of performance + * enhancements like caching. However, it is not acceptable to return true if the results could + * possibly be different. + * + * By default this function performs a modified version of equality that is tolerant of cosmetic + * differences like attribute naming and or expression id differences. Logical operators that + * can do better should override this function. + */ + def sameResult(plan: LogicalPlan): Boolean = { + plan.getClass == this.getClass && + plan.children.size == children.size && { + logDebug(s"[${cleanArgs.mkString(", ")}] == [${plan.cleanArgs.mkString(", ")}]") + cleanArgs == plan.cleanArgs + } && + (plan.children, children).zipped.forall(_ sameResult _) + } + + /** Args that have cleaned such that differences in expression id should not affect equality */ + protected lazy val cleanArgs: Seq[Any] = { + val input = children.flatMap(_.output) + productIterator.map { + // Children are checked using sameResult above. + case tn: TreeNode[_] if children contains tn => null + case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case s: Option[_] => s.map { + case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case other => other + } + case s: Seq[_] => s.map { + case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case other => other + } + case other => other + }.toSeq + } + /** * Optionally resolves the given string to a [[NamedExpression]] using the input from all child * nodes of this LogicalPlan. The attribute is expressed as diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala index f8fe558511..19769986ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala @@ -41,4 +41,10 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil) } override protected def stringArgs = Iterator(output) + + override def sameResult(plan: LogicalPlan): Boolean = plan match { + case LocalRelation(otherOutput, otherData) => + otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data + case _ => false + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 391508279b..f8e9930ac2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -105,8 +105,8 @@ case class InsertIntoTable( child: LogicalPlan, overwrite: Boolean) extends LogicalPlan { - // The table being inserted into is a child for the purposes of transformations. - override def children = table :: child :: Nil + + override def children = child :: Nil override def output = child.output override lazy val resolved = childrenResolved && child.output.zip(table.output).forall { -- cgit v1.2.3