aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-10-03 12:34:27 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-03 12:34:27 -0700
commit6a1d48f4f02c4498b64439c3dd5f671286a90e30 (patch)
tree0b22a278418d9f1d8a6decf3f15aafab0de3dd84 /sql/catalyst
parentbec0d0eaa33811fde72b84f7d53a6f6031e7b5d3 (diff)
downloadspark-6a1d48f4f02c4498b64439c3dd5f671286a90e30.tar.gz
spark-6a1d48f4f02c4498b64439c3dd5f671286a90e30.tar.bz2
spark-6a1d48f4f02c4498b64439c3dd5f671286a90e30.zip
[SPARK-3212][SQL] Use logical plan matching instead of temporary tables for table caching
_Also addresses: SPARK-1671, SPARK-1379 and SPARK-3641_ This PR introduces a new trait, `CacheManger`, which replaces the previous temporary table based caching system. Instead of creating a temporary table that shadows an existing table with and equivalent cached representation, the cached manager maintains a separate list of logical plans and their cached data. After optimization, this list is searched for any matching plan fragments. When a matching plan fragment is found it is replaced with the cached data. There are several advantages to this approach: - Calling .cache() on a SchemaRDD now works as you would expect, and uses the more efficient columnar representation. - Its now possible to provide a list of temporary tables, without having to decide if a given table is actually just a cached persistent table. (To be done in a follow-up PR) - In some cases it is possible that cached data will be used, even if a cached table was not explicitly requested. This is because we now look at the logical structure instead of the table name. - We now correctly invalidate when data is inserted into a hive table. Author: Michael Armbrust <michael@databricks.com> Closes #2501 from marmbrus/caching and squashes the following commits: 63fbc2c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching. 0ea889e [Michael Armbrust] Address comments. 1e23287 [Michael Armbrust] Add support for cache invalidation for hive inserts. 65ed04a [Michael Armbrust] fix tests. bdf9a3f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching b4b77f2 [Michael Armbrust] Address comments 6923c9d [Michael Armbrust] More comments / tests 80f26ac [Michael Armbrust] First draft of improved semantics for Spark SQL caching.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala42
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala62
6 files changed, 117 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 71810b798b..fe83eb1250 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -93,6 +93,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
object ResolveRelations extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) =>
+ i.copy(
+ table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
case UnresolvedRelation(databaseName, name, alias) =>
catalog.lookupRelation(databaseName, name, alias)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 59fb0311a9..e5a958d599 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -62,7 +62,7 @@ abstract class Attribute extends NamedExpression {
def withName(newName: String): Attribute
def toAttribute = this
- def newInstance: Attribute
+ def newInstance(): Attribute
}
@@ -131,7 +131,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
h
}
- override def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
+ override def newInstance() = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 28d863e58b..4f8ad8a7e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.trees
@@ -73,6 +74,47 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
def childrenResolved: Boolean = !children.exists(!_.resolved)
/**
+ * Returns true when the given logical plan will return the same results as this logical plan.
+ *
+ * Since its likely undecideable to generally determine if two given plans will produce the same
+ * results, it is okay for this function to return false, even if the results are actually
+ * the same. Such behavior will not affect correctness, only the application of performance
+ * enhancements like caching. However, it is not acceptable to return true if the results could
+ * possibly be different.
+ *
+ * By default this function performs a modified version of equality that is tolerant of cosmetic
+ * differences like attribute naming and or expression id differences. Logical operators that
+ * can do better should override this function.
+ */
+ def sameResult(plan: LogicalPlan): Boolean = {
+ plan.getClass == this.getClass &&
+ plan.children.size == children.size && {
+ logDebug(s"[${cleanArgs.mkString(", ")}] == [${plan.cleanArgs.mkString(", ")}]")
+ cleanArgs == plan.cleanArgs
+ } &&
+ (plan.children, children).zipped.forall(_ sameResult _)
+ }
+
+ /** Args that have cleaned such that differences in expression id should not affect equality */
+ protected lazy val cleanArgs: Seq[Any] = {
+ val input = children.flatMap(_.output)
+ productIterator.map {
+ // Children are checked using sameResult above.
+ case tn: TreeNode[_] if children contains tn => null
+ case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+ case s: Option[_] => s.map {
+ case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+ case other => other
+ }
+ case s: Seq[_] => s.map {
+ case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+ case other => other
+ }
+ case other => other
+ }.toSeq
+ }
+
+ /**
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
index f8fe558511..19769986ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
@@ -41,4 +41,10 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil)
}
override protected def stringArgs = Iterator(output)
+
+ override def sameResult(plan: LogicalPlan): Boolean = plan match {
+ case LocalRelation(otherOutput, otherData) =>
+ otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
+ case _ => false
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 391508279b..f8e9930ac2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -105,8 +105,8 @@ case class InsertIntoTable(
child: LogicalPlan,
overwrite: Boolean)
extends LogicalPlan {
- // The table being inserted into is a child for the purposes of transformations.
- override def children = table :: child :: Nil
+
+ override def children = child :: Nil
override def output = child.output
override lazy val resolved = childrenResolved && child.output.zip(table.output).forall {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
new file mode 100644
index 0000000000..e8a793d107
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.util._
+
+/**
+ * Provides helper methods for comparing plans.
+ */
+class SameResultSuite extends FunSuite {
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation2 = LocalRelation('a.int, 'b.int, 'c.int)
+
+ def assertSameResult(a: LogicalPlan, b: LogicalPlan, result: Boolean = true) = {
+ val aAnalyzed = a.analyze
+ val bAnalyzed = b.analyze
+
+ if (aAnalyzed.sameResult(bAnalyzed) != result) {
+ val comparison = sideBySide(aAnalyzed.toString, bAnalyzed.toString).mkString("\n")
+ fail(s"Plans should return sameResult = $result\n$comparison")
+ }
+ }
+
+ test("relations") {
+ assertSameResult(testRelation, testRelation2)
+ }
+
+ test("projections") {
+ assertSameResult(testRelation.select('a), testRelation2.select('a))
+ assertSameResult(testRelation.select('b), testRelation2.select('b))
+ assertSameResult(testRelation.select('a, 'b), testRelation2.select('a, 'b))
+ assertSameResult(testRelation.select('b, 'a), testRelation2.select('b, 'a))
+
+ assertSameResult(testRelation, testRelation2.select('a), false)
+ assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), false)
+ }
+
+ test("filters") {
+ assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b))
+ }
+}