aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-07-17 16:03:33 -0700
committerMichael Armbrust <michael@databricks.com>2015-07-17 16:03:33 -0700
commitfd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869 (patch)
tree1fc5ce8296eee546c5a3364617fd66e662f66237
parent15fc2ffe5530c43c64cfc37f2d1ce83f04ce3bd9 (diff)
downloadspark-fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869.tar.gz
spark-fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869.tar.bz2
spark-fd6b3101fbb0a8c3ebcf89ce9b4e8664406d9869.zip
[SPARK-9113] [SQL] enable analysis check code for self join
The check was unreachable before, as `case operator: LogicalPlan` catches everything already. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7449 from cloud-fan/tmp and squashes the following commits: 2bb6637 [Wenchen Fan] add test 5493aea [Wenchen Fan] add the check back 27221a7 [Wenchen Fan] remove unnecessary analysis check code for self join
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala14
4 files changed, 29 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index df8e7f2381..e58f3f6494 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -316,7 +316,7 @@ class Analyzer(
)
// Special handling for cases when self-join introduce duplicate expression ids.
- case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+ case j @ Join(left, right, _, _) if !j.selfJoinResolved =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 476ac2b7cb..c7f9713344 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -109,29 +109,27 @@ trait CheckAnalysis {
s"resolved attribute(s) $missingAttributes missing from $input " +
s"in operator ${operator.simpleString}")
- case o if !o.resolved =>
- failAnalysis(
- s"unresolved operator ${operator.simpleString}")
-
case p @ Project(exprs, _) if containsMultipleGenerators(exprs) =>
failAnalysis(
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)
+ // Special handling for cases when self-join introduce duplicate expression ids.
+ case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+ val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+ failAnalysis(
+ s"""
+ |Failure when resolving conflicting references in Join:
+ |$plan
+ |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+ |""".stripMargin)
+
+ case o if !o.resolved =>
+ failAnalysis(
+ s"unresolved operator ${operator.simpleString}")
case _ => // Analysis successful!
}
-
- // Special handling for cases when self-join introduce duplicate expression ids.
- case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
- val conflictingAttributes = left.outputSet.intersect(right.outputSet)
- failAnalysis(
- s"""
- |Failure when resolving conflicting references in Join:
- |$plan
- |Conflicting attributes: ${conflictingAttributes.mkString(",")}
- |""".stripMargin)
-
}
extendedCheckRules.foreach(_(plan))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index fbe104db01..17a9124732 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -123,11 +123,11 @@ case class Join(
}
}
- private def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+ def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
- // Joins are only resolved if they don't introduce ambiguious expression ids.
+ // Joins are only resolved if they don't introduce ambiguous expression ids.
override lazy val resolved: Boolean = {
- childrenResolved && !expressions.exists(!_.resolved) && selfJoinResolved
+ childrenResolved && expressions.forall(_.resolved) && selfJoinResolved
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index f0f1710399..2147d07e09 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -23,10 +23,11 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.{InternalRow, SimpleCatalystConf}
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.types._
case class TestFunction(
children: Seq[Expression],
@@ -164,4 +165,13 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter {
assert(message.contains("resolved attribute(s) a#1 missing from a#2"))
}
+
+ test("error test for self-join") {
+ val join = Join(testRelation, testRelation, Inner, None)
+ val error = intercept[AnalysisException] {
+ SimpleAnalyzer.checkAnalysis(join)
+ }
+ error.message.contains("Failure when resolving conflicting references in Join")
+ error.message.contains("Conflicting attributes")
+ }
}