diff options
7 files changed, 40 insertions, 30 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3f0d77ad63..2d1fa106a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -53,14 +53,11 @@ class Analyzer(catalog: Catalog, val extendedRules: Seq[Rule[LogicalPlan]] = Nil lazy val batches: Seq[Batch] = Seq( - Batch("MultiInstanceRelations", Once, - NewRelationInstances), Batch("Resolution", fixedPoint, - ResolveReferences :: ResolveRelations :: + ResolveReferences :: ResolveGroupingAnalytics :: ResolveSortReferences :: - NewRelationInstances :: ImplicitGenerate :: ResolveFunctions :: GlobalAggregates :: @@ -285,6 +282,27 @@ class Analyzer(catalog: Catalog, } ) + // Special handling for cases when self-join introduce duplicate expression ids. + case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => + val conflictingAttributes = left.outputSet.intersect(right.outputSet) + + val (oldRelation, newRelation, attributeRewrites) = right.collect { + case oldVersion: MultiInstanceRelation + if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => + val newVersion = oldVersion.newInstance() + val newAttributes = AttributeMap(oldVersion.output.zip(newVersion.output)) + (oldVersion, newVersion, newAttributes) + }.head // Only handle first case found, others will be fixed on the next pass. + + val newRight = right transformUp { + case r if r == oldRelation => newRelation + case other => other transformExpressions { + case a: Attribute => attributeRewrites.get(a).getOrElse(a) + } + } + + j.copy(right = newRight) + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") q transformExpressionsUp { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala index 4c5fb3f45b..894c3500cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala @@ -26,28 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * produced by distinct operators in a query tree as this breaks the guarantee that expression * ids, which are used to differentiate attributes, are unique. * - * Before analysis, all operators that include this trait will be asked to produce a new version + * During analysis, operators that include this trait may be asked to produce a new version * of itself with globally unique expression ids. */ trait MultiInstanceRelation { def newInstance(): this.type } - -/** - * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so - * that each instance has unique expression ids for the attributes produced. - */ -object NewRelationInstances extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = { - val localRelations = plan collect { case l: MultiInstanceRelation => l} - val multiAppearance = localRelations - .groupBy(identity[MultiInstanceRelation]) - .filter { case (_, ls) => ls.size > 1 } - .map(_._1) - .toSet - - plan transform { - case l: MultiInstanceRelation if multiAppearance.contains(l) => l.newInstance() - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index c4a1f899d8..7d609b9138 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -33,11 +33,9 @@ class PlanTest extends FunSuite { * we must normalize them to check if two different queries are identical. */ protected def normalizeExprIds(plan: LogicalPlan) = { - val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)) - val minId = if (list.isEmpty) 0 else list.min plan transformAllExpressions { case a: AttributeReference => - AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId)) + AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 05ac1623d7..fd121ce056 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -122,6 +122,7 @@ class SQLContext(@transient val sparkContext: SparkContext) case _ => } + @transient protected[sql] val cacheManager = new CacheManager(this) /** @@ -159,6 +160,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * DataTypes.StringType); * }}} */ + @transient val udf: UDFRegistration = new UDFRegistration(this) /** Returns true if the table is currently cached in-memory. */ diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties index fbed0a782d..28e90b9520 100644 --- a/sql/core/src/test/resources/log4j.properties +++ b/sql/core/src/test/resources/log4j.properties @@ -39,6 +39,9 @@ log4j.appender.FA.Threshold = INFO log4j.additivity.parquet.hadoop.ParquetRecordReader=false log4j.logger.parquet.hadoop.ParquetRecordReader=OFF +log4j.additivity.parquet.hadoop.ParquetOutputCommitter=false +log4j.logger.parquet.hadoop.ParquetOutputCommitter=OFF + log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 02623f73c7..7be9215a44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery import org.apache.spark.sql.test.TestSQLContext.implicits._ +import org.apache.spark.sql.test.TestSQLContext.sql class DataFrameSuite extends QueryTest { @@ -88,6 +89,15 @@ class DataFrameSuite extends QueryTest { testData.collect().toSeq) } + test("self join") { + val df1 = testData.select(testData("key")).as('df1) + val df2 = testData.select(testData("key")).as('df2) + + checkAnswer( + df1.join(df2, $"df1.key" === $"df2.key"), + sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq) + } + test("selectExpr") { checkAnswer( testData.selectExpr("abs(key)", "value"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 081d94b6fc..44ee5ab597 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -35,11 +35,9 @@ class PlanTest extends FunSuite { * we must normalize them to check if two different queries are identical. */ protected def normalizeExprIds(plan: LogicalPlan) = { - val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)) - val minId = if (list.isEmpty) 0 else list.min plan transformAllExpressions { case a: AttributeReference => - AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId)) + AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0)) } } |