aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-23 17:34:54 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-23 17:34:54 -0800
commit1ed57086d402c38d95cda6c3d9d7aea806609bf9 (patch)
treefb92a551881535edd2bb9c8c234d901d81e10876 /sql/catalyst
parent48376bfe9c97bf31279918def6c6615849c88f4d (diff)
downloadspark-1ed57086d402c38d95cda6c3d9d7aea806609bf9.tar.gz
spark-1ed57086d402c38d95cda6c3d9d7aea806609bf9.tar.bz2
spark-1ed57086d402c38d95cda6c3d9d7aea806609bf9.zip
[SPARK-5873][SQL] Allow viewing of partially analyzed plans in queryExecution
Author: Michael Armbrust <michael@databricks.com> Closes #4684 from marmbrus/explainAnalysis and squashes the following commits: afbaa19 [Michael Armbrust] fix python d93278c [Michael Armbrust] fix hive e5fa0a4 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into explainAnalysis 52119f2 [Michael Armbrust] more tests 82a5431 [Michael Armbrust] fix tests 25753d2 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into explainAnalysis aee1e6a [Michael Armbrust] fix hive b23a844 [Michael Armbrust] newline de8dc51 [Michael Armbrust] more comments acf620a [Michael Armbrust] [SPARK-5873][SQL] Show partially analyzed plans in query execution
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala83
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala105
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala35
4 files changed, 128 insertions, 97 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 124f083669..b16aff99af 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -78,6 +78,7 @@ class SqlParser extends AbstractSparkSQLParser {
protected val IF = Keyword("IF")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
+ protected val INT = Keyword("INT")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
@@ -394,6 +395,7 @@ class SqlParser extends AbstractSparkSQLParser {
| fixedDecimalType
| DECIMAL ^^^ DecimalType.Unlimited
| DATE ^^^ DateType
+ | INT ^^^ IntegerType
)
protected lazy val fixedDecimalType: Parser[DataType] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fc37b8cde0..e4e542562f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -52,12 +52,6 @@ class Analyzer(catalog: Catalog,
*/
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
- /**
- * Override to provide additional rules for the "Check Analysis" batch.
- * These rules will be evaluated after our built-in check rules.
- */
- val extendedCheckRules: Seq[Rule[LogicalPlan]] = Nil
-
lazy val batches: Seq[Batch] = Seq(
Batch("Resolution", fixedPoint,
ResolveRelations ::
@@ -71,88 +65,11 @@ class Analyzer(catalog: Catalog,
TrimGroupingAliases ::
typeCoercionRules ++
extendedResolutionRules : _*),
- Batch("Check Analysis", Once,
- CheckResolution +:
- extendedCheckRules: _*),
Batch("Remove SubQueries", fixedPoint,
EliminateSubQueries)
)
/**
- * Makes sure all attributes and logical plans have been resolved.
- */
- object CheckResolution extends Rule[LogicalPlan] {
- def failAnalysis(msg: String) = { throw new AnalysisException(msg) }
-
- def apply(plan: LogicalPlan): LogicalPlan = {
- // We transform up and order the rules so as to catch the first possible failure instead
- // of the result of cascading resolution failures.
- plan.foreachUp {
- case operator: LogicalPlan =>
- operator transformExpressionsUp {
- case a: Attribute if !a.resolved =>
- val from = operator.inputSet.map(_.name).mkString(", ")
- a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from")
-
- case c: Cast if !c.resolved =>
- failAnalysis(
- s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")
-
- case b: BinaryExpression if !b.resolved =>
- failAnalysis(
- s"invalid expression ${b.prettyString} " +
- s"between ${b.left.simpleString} and ${b.right.simpleString}")
- }
-
- operator match {
- case f: Filter if f.condition.dataType != BooleanType =>
- failAnalysis(
- s"filter expression '${f.condition.prettyString}' " +
- s"of type ${f.condition.dataType.simpleString} is not a boolean.")
-
- case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) =>
- def checkValidAggregateExpression(expr: Expression): Unit = expr match {
- case _: AggregateExpression => // OK
- case e: Attribute if !groupingExprs.contains(e) =>
- failAnalysis(
- s"expression '${e.prettyString}' is neither present in the group by, " +
- s"nor is it an aggregate function. " +
- "Add to group by or wrap in first() if you don't care which value you get.")
- case e if groupingExprs.contains(e) => // OK
- case e if e.references.isEmpty => // OK
- case e => e.children.foreach(checkValidAggregateExpression)
- }
-
- val cleaned = aggregateExprs.map(_.transform {
- // Should trim aliases around `GetField`s. These aliases are introduced while
- // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
- // (Should we just turn `GetField` into a `NamedExpression`?)
- case Alias(g, _) => g
- })
-
- cleaned.foreach(checkValidAggregateExpression)
-
- case o if o.children.nonEmpty &&
- !o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) =>
- val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",")
- val input = o.inputSet.map(_.prettyString).mkString(",")
-
- failAnalysis(s"resolved attributes $missingAttributes missing from $input")
-
- // Catch all
- case o if !o.resolved =>
- failAnalysis(
- s"unresolved operator ${operator.simpleString}")
-
- case _ => // Analysis successful!
- }
- }
-
- plan
- }
- }
-
- /**
* Removes no-op Alias expressions from the plan.
*/
object TrimGroupingAliases extends Rule[LogicalPlan] {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
new file mode 100644
index 0000000000..4e8fc892f3
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types._
+
+/**
+ * Throws user facing errors when passed invalid queries that fail to analyze.
+ */
+class CheckAnalysis {
+
+ /**
+ * Override to provide additional checks for correct analysis.
+ * These rules will be evaluated after our built-in check rules.
+ */
+ val extendedCheckRules: Seq[LogicalPlan => Unit] = Nil
+
+ def failAnalysis(msg: String) = {
+ throw new AnalysisException(msg)
+ }
+
+ def apply(plan: LogicalPlan): Unit = {
+ // We transform up and order the rules so as to catch the first possible failure instead
+ // of the result of cascading resolution failures.
+ plan.foreachUp {
+ case operator: LogicalPlan =>
+ operator transformExpressionsUp {
+ case a: Attribute if !a.resolved =>
+ val from = operator.inputSet.map(_.name).mkString(", ")
+ a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from")
+
+ case c: Cast if !c.resolved =>
+ failAnalysis(
+ s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")
+
+ case b: BinaryExpression if !b.resolved =>
+ failAnalysis(
+ s"invalid expression ${b.prettyString} " +
+ s"between ${b.left.simpleString} and ${b.right.simpleString}")
+ }
+
+ operator match {
+ case f: Filter if f.condition.dataType != BooleanType =>
+ failAnalysis(
+ s"filter expression '${f.condition.prettyString}' " +
+ s"of type ${f.condition.dataType.simpleString} is not a boolean.")
+
+ case aggregatePlan@Aggregate(groupingExprs, aggregateExprs, child) =>
+ def checkValidAggregateExpression(expr: Expression): Unit = expr match {
+ case _: AggregateExpression => // OK
+ case e: Attribute if !groupingExprs.contains(e) =>
+ failAnalysis(
+ s"expression '${e.prettyString}' is neither present in the group by, " +
+ s"nor is it an aggregate function. " +
+ "Add to group by or wrap in first() if you don't care which value you get.")
+ case e if groupingExprs.contains(e) => // OK
+ case e if e.references.isEmpty => // OK
+ case e => e.children.foreach(checkValidAggregateExpression)
+ }
+
+ val cleaned = aggregateExprs.map(_.transform {
+ // Should trim aliases around `GetField`s. These aliases are introduced while
+ // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
+ // (Should we just turn `GetField` into a `NamedExpression`?)
+ case Alias(g, _) => g
+ })
+
+ cleaned.foreach(checkValidAggregateExpression)
+
+ case o if o.children.nonEmpty &&
+ !o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) =>
+ val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",")
+ val input = o.inputSet.map(_.prettyString).mkString(",")
+
+ failAnalysis(s"resolved attributes $missingAttributes missing from $input")
+
+ // Catch all
+ case o if !o.resolved =>
+ failAnalysis(
+ s"unresolved operator ${operator.simpleString}")
+
+ case _ => // Analysis successful!
+ }
+ }
+ extendedCheckRules.foreach(_(plan))
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index aec7847356..c1dd5aa913 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -30,11 +30,21 @@ import org.apache.spark.sql.catalyst.dsl.plans._
class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
- val caseSensitiveAnalyze =
+
+ val caseSensitiveAnalyzer =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
- val caseInsensitiveAnalyze =
+ val caseInsensitiveAnalyzer =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
+ val checkAnalysis = new CheckAnalysis
+
+
+ def caseSensitiveAnalyze(plan: LogicalPlan) =
+ checkAnalysis(caseSensitiveAnalyzer(plan))
+
+ def caseInsensitiveAnalyze(plan: LogicalPlan) =
+ checkAnalysis(caseInsensitiveAnalyzer(plan))
+
val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val testRelation2 = LocalRelation(
AttributeReference("a", StringType)(),
@@ -55,7 +65,7 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
a.select(UnresolvedStar(None)).select('a).unionAll(b.select(UnresolvedStar(None)))
}
- assert(caseInsensitiveAnalyze(plan).resolved)
+ assert(caseInsensitiveAnalyzer(plan).resolved)
}
test("check project's resolved") {
@@ -71,11 +81,11 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
test("analyze project") {
assert(
- caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
+ caseSensitiveAnalyzer(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
Project(testRelation.output, testRelation))
assert(
- caseSensitiveAnalyze(
+ caseSensitiveAnalyzer(
Project(Seq(UnresolvedAttribute("TbL.a")),
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
@@ -88,13 +98,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(e.getMessage().toLowerCase.contains("cannot resolve"))
assert(
- caseInsensitiveAnalyze(
+ caseInsensitiveAnalyzer(
Project(Seq(UnresolvedAttribute("TbL.a")),
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
assert(
- caseInsensitiveAnalyze(
+ caseInsensitiveAnalyzer(
Project(Seq(UnresolvedAttribute("tBl.a")),
UnresolvedRelation(Seq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
@@ -107,16 +117,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(e.getMessage == "Table Not Found: tAbLe")
assert(
- caseSensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
- testRelation)
+ caseSensitiveAnalyzer(UnresolvedRelation(Seq("TaBlE"), None)) === testRelation)
assert(
- caseInsensitiveAnalyze(UnresolvedRelation(Seq("tAbLe"), None)) ===
- testRelation)
+ caseInsensitiveAnalyzer(UnresolvedRelation(Seq("tAbLe"), None)) === testRelation)
assert(
- caseInsensitiveAnalyze(UnresolvedRelation(Seq("TaBlE"), None)) ===
- testRelation)
+ caseInsensitiveAnalyzer(UnresolvedRelation(Seq("TaBlE"), None)) === testRelation)
}
def errorTest(
@@ -177,7 +184,7 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
AttributeReference("d", DecimalType.Unlimited)(),
AttributeReference("e", ShortType)())
- val plan = caseInsensitiveAnalyze(
+ val plan = caseInsensitiveAnalyzer(
testRelation2.select(
'a / Literal(2) as 'div1,
'a / 'b as 'div2,