aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala21
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/test/resources/log4j.properties3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala4
7 files changed, 40 insertions, 30 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3f0d77ad63..2d1fa106a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -53,14 +53,11 @@ class Analyzer(catalog: Catalog,
val extendedRules: Seq[Rule[LogicalPlan]] = Nil
lazy val batches: Seq[Batch] = Seq(
- Batch("MultiInstanceRelations", Once,
- NewRelationInstances),
Batch("Resolution", fixedPoint,
- ResolveReferences ::
ResolveRelations ::
+ ResolveReferences ::
ResolveGroupingAnalytics ::
ResolveSortReferences ::
- NewRelationInstances ::
ImplicitGenerate ::
ResolveFunctions ::
GlobalAggregates ::
@@ -285,6 +282,27 @@ class Analyzer(catalog: Catalog,
}
)
+ // Special handling for cases when self-join introduce duplicate expression ids.
+ case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+ val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+
+ val (oldRelation, newRelation, attributeRewrites) = right.collect {
+ case oldVersion: MultiInstanceRelation
+ if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+ val newVersion = oldVersion.newInstance()
+ val newAttributes = AttributeMap(oldVersion.output.zip(newVersion.output))
+ (oldVersion, newVersion, newAttributes)
+ }.head // Only handle first case found, others will be fixed on the next pass.
+
+ val newRight = right transformUp {
+ case r if r == oldRelation => newRelation
+ case other => other transformExpressions {
+ case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+ }
+ }
+
+ j.copy(right = newRight)
+
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q transformExpressionsUp {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 4c5fb3f45b..894c3500cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -26,28 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* produced by distinct operators in a query tree as this breaks the guarantee that expression
* ids, which are used to differentiate attributes, are unique.
*
- * Before analysis, all operators that include this trait will be asked to produce a new version
+ * During analysis, operators that include this trait may be asked to produce a new version
* of itself with globally unique expression ids.
*/
trait MultiInstanceRelation {
def newInstance(): this.type
}
-
-/**
- * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
- * that each instance has unique expression ids for the attributes produced.
- */
-object NewRelationInstances extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = {
- val localRelations = plan collect { case l: MultiInstanceRelation => l}
- val multiAppearance = localRelations
- .groupBy(identity[MultiInstanceRelation])
- .filter { case (_, ls) => ls.size > 1 }
- .map(_._1)
- .toSet
-
- plan transform {
- case l: MultiInstanceRelation if multiAppearance.contains(l) => l.newInstance()
- }
- }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index c4a1f899d8..7d609b9138 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -33,11 +33,9 @@ class PlanTest extends FunSuite {
* we must normalize them to check if two different queries are identical.
*/
protected def normalizeExprIds(plan: LogicalPlan) = {
- val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
- val minId = if (list.isEmpty) 0 else list.min
plan transformAllExpressions {
case a: AttributeReference =>
- AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+ AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 05ac1623d7..fd121ce056 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -122,6 +122,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
case _ =>
}
+ @transient
protected[sql] val cacheManager = new CacheManager(this)
/**
@@ -159,6 +160,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* DataTypes.StringType);
* }}}
*/
+ @transient
val udf: UDFRegistration = new UDFRegistration(this)
/** Returns true if the table is currently cached in-memory. */
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index fbed0a782d..28e90b9520 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -39,6 +39,9 @@ log4j.appender.FA.Threshold = INFO
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
+log4j.additivity.parquet.hadoop.ParquetOutputCommitter=false
+log4j.logger.parquet.hadoop.ParquetOutputCommitter=OFF
+
log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 02623f73c7..7be9215a44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery
import org.apache.spark.sql.test.TestSQLContext.implicits._
+import org.apache.spark.sql.test.TestSQLContext.sql
class DataFrameSuite extends QueryTest {
@@ -88,6 +89,15 @@ class DataFrameSuite extends QueryTest {
testData.collect().toSeq)
}
+ test("self join") {
+ val df1 = testData.select(testData("key")).as('df1)
+ val df2 = testData.select(testData("key")).as('df2)
+
+ checkAnswer(
+ df1.join(df2, $"df1.key" === $"df2.key"),
+ sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
+ }
+
test("selectExpr") {
checkAnswer(
testData.selectExpr("abs(key)", "value"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 081d94b6fc..44ee5ab597 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -35,11 +35,9 @@ class PlanTest extends FunSuite {
* we must normalize them to check if two different queries are identical.
*/
protected def normalizeExprIds(plan: LogicalPlan) = {
- val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
- val minId = if (list.isEmpty) 0 else list.min
plan transformAllExpressions {
case a: AttributeReference =>
- AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+ AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
}
}