aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSantiago M. Mola <smola@stratio.com>2015-06-24 12:29:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-24 12:29:07 -0700
commitb84d4b4dfe8ced1b96a0c74ef968a20a1bba8231 (patch)
treedd8ee160bb7989cd39b010ed2124e576ebeae6c7 /sql
parent43e66192f45a23f7232116e9f664158862df5015 (diff)
downloadspark-b84d4b4dfe8ced1b96a0c74ef968a20a1bba8231.tar.gz
spark-b84d4b4dfe8ced1b96a0c74ef968a20a1bba8231.tar.bz2
spark-b84d4b4dfe8ced1b96a0c74ef968a20a1bba8231.zip
[SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.
ResolveReferences analysis rule now does not throw when it cannot resolve references in a self-join. Author: Santiago M. Mola <smola@stratio.com> Closes #6853 from smola/SPARK-7088 and squashes the following commits: af71ac7 [Santiago M. Mola] [SPARK-7088] Fix analysis for 3rd party logical plan.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala38
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala12
2 files changed, 32 insertions, 18 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a3f5a7b5c..b06759f144 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -283,7 +283,7 @@ class Analyzer(
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
- val (oldRelation, newRelation) = right.collect {
+ right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
@@ -308,25 +308,27 @@ class Analyzer(
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
- }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
- sys.error(
- s"""
- |Failure when resolving conflicting references in Join:
- |$plan
- |
- |Conflicting attributes: ${conflictingAttributes.mkString(",")}
- """.stripMargin)
}
-
- val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
- val newRight = right transformUp {
- case r if r == oldRelation => newRelation
- } transformUp {
- case other => other transformExpressions {
- case a: Attribute => attributeRewrites.get(a).getOrElse(a)
- }
+ // Only handle first case, others will be fixed on the next pass.
+ .headOption match {
+ case None =>
+ /*
+ * No result implies that there is a logical plan node that produces new references
+ * that this rule cannot handle. When that is the case, there must be another rule
+ * that resolves these conflicts. Otherwise, the analysis will fail.
+ */
+ j
+ case Some((oldRelation, newRelation)) =>
+ val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
+ val newRight = right transformUp {
+ case r if r == oldRelation => newRelation
+ } transformUp {
+ case other => other transformExpressions {
+ case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+ }
+ }
+ j.copy(right = newRight)
}
- j.copy(right = newRight)
// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c5a1437be6..a069b4710f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -48,6 +48,7 @@ trait CheckAnalysis {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
+
case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
@@ -121,6 +122,17 @@ trait CheckAnalysis {
case _ => // Analysis successful!
}
+
+ // Special handling for cases when self-join introduce duplicate expression ids.
+ case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+ val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+ failAnalysis(
+ s"""
+ |Failure when resolving conflicting references in Join:
+ |$plan
+ |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+ |""".stripMargin)
+
}
extendedCheckRules.foreach(_(plan))
}