diff options
author | Wenchen Fan <cloud0fan@outlook.com> | 2015-07-17 16:03:33 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-07-17 16:03:33 -0700 |
commit | fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869 (patch) | |
tree | 1fc5ce8296eee546c5a3364617fd66e662f66237 | |
parent | 15fc2ffe5530c43c64cfc37f2d1ce83f04ce3bd9 (diff) | |
download | spark-fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869.tar.gz spark-fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869.tar.bz2 spark-fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869.zip |
[SPARK-9113] [SQL] enable analysis check code for self join
The check was unreachable before, as `case operator: LogicalPlan` catches everything already.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes #7449 from cloud-fan/tmp and squashes the following commits:
2bb6637 [Wenchen Fan] add test
5493aea [Wenchen Fan] add the check back
27221a7 [Wenchen Fan] remove unnecessary analysis check code for self join
4 files changed, 29 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index df8e7f2381..e58f3f6494 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -316,7 +316,7 @@ class Analyzer( ) // Special handling for cases when self-join introduce duplicate expression ids. - case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => + case j @ Join(left, right, _, _) if !j.selfJoinResolved => val conflictingAttributes = left.outputSet.intersect(right.outputSet) logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 476ac2b7cb..c7f9713344 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -109,29 +109,27 @@ trait CheckAnalysis { s"resolved attribute(s) $missingAttributes missing from $input " + s"in operator ${operator.simpleString}") - case o if !o.resolved => - failAnalysis( - s"unresolved operator ${operator.simpleString}") - case p @ Project(exprs, _) if containsMultipleGenerators(exprs) => failAnalysis( s"""Only a single table generating function is allowed in a SELECT clause, found: | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin) + // Special handling for cases when self-join introduce duplicate expression ids. + case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => + val conflictingAttributes = left.outputSet.intersect(right.outputSet) + failAnalysis( + s""" + |Failure when resolving conflicting references in Join: + |$plan + |Conflicting attributes: ${conflictingAttributes.mkString(",")} + |""".stripMargin) + + case o if !o.resolved => + failAnalysis( + s"unresolved operator ${operator.simpleString}") case _ => // Analysis successful! } - - // Special handling for cases when self-join introduce duplicate expression ids. - case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => - val conflictingAttributes = left.outputSet.intersect(right.outputSet) - failAnalysis( - s""" - |Failure when resolving conflicting references in Join: - |$plan - |Conflicting attributes: ${conflictingAttributes.mkString(",")} - |""".stripMargin) - } extendedCheckRules.foreach(_(plan)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index fbe104db01..17a9124732 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -123,11 +123,11 @@ case class Join( } } - private def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - // Joins are only resolved if they don't introduce ambiguious expression ids. + // Joins are only resolved if they don't introduce ambiguous expression ids. override lazy val resolved: Boolean = { - childrenResolved && !expressions.exists(!_.resolved) && selfJoinResolved + childrenResolved && expressions.forall(_.resolved) && selfJoinResolved } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index f0f1710399..2147d07e09 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -23,10 +23,11 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.{InternalRow, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.types._ case class TestFunction( children: Seq[Expression], @@ -164,4 +165,13 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter { assert(message.contains("resolved attribute(s) a#1 missing from a#2")) } + + test("error test for self-join") { + val join = Join(testRelation, testRelation, Inner, None) + val error = intercept[AnalysisException] { + SimpleAnalyzer.checkAnalysis(join) + } + error.message.contains("Failure when resolving conflicting references in Join") + error.message.contains("Conflicting attributes") + } } |