aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala8
4 files changed, 27 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 44eceb0b37..ba1ac141b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -252,7 +252,15 @@ class Analyzer(catalog: Catalog,
case oldVersion @ Aggregate(_, aggregateExpressions, _)
if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
(oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))
- }.head // Only handle first case found, others will be fixed on the next pass.
+ }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
+ sys.error(
+ s"""
+ |Failure when resolving conflicting references in Join:
+ |$plan
+ |
+ |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+ """.stripMargin)
+ }
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 894c3500cf..35b74024a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,5 +30,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* of itself with globally unique expression ids.
*/
trait MultiInstanceRelation {
- def newInstance(): this.type
+ def newInstance(): LogicalPlan
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1a99555e9..203164ea84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.Logging
import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException, Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
@@ -697,7 +697,7 @@ private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: TTable, val partitions: Seq[TPartition])
(@transient sqlContext: SQLContext)
- extends LeafNode {
+ extends LeafNode with MultiInstanceRelation {
self: Product =>
@@ -778,6 +778,13 @@ private[hive] case class MetastoreRelation
/** An attribute map for determining the ordinal for non-partition columns. */
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+
+ override def newInstance() = {
+ val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
+ // The project here is an ugly hack to work around the fact that MetastoreRelation's
+ // equals method is broken. Please remove this when SPARK-6555 is fixed.
+ Project(newCopy.output, newCopy)
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index aad48ada52..fa8e11ffec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive
+import org.apache.spark.sql.hive.test.TestHive
import org.scalatest.FunSuite
import org.apache.spark.sql.test.ExamplePointUDT
@@ -36,4 +37,11 @@ class HiveMetastoreCatalogSuite extends FunSuite {
assert(HiveMetastoreTypes.toMetastoreType(udt) ===
HiveMetastoreTypes.toMetastoreType(udt.sqlType))
}
+
+ test("duplicated metastore relations") {
+ import TestHive.implicits._
+ val df = TestHive.sql("SELECT * FROM src")
+ println(df.queryExecution)
+ df.as('a).join(df.as('b), $"a.key" === $"b.key")
+ }
}