From ad3cc1312db3b5667cea134940a09896a4609b74 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 7 Apr 2017 15:58:50 +0800 Subject: [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly ## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan Closes #17552 from cloud-fan/minor. --- .../execution/datasources/DataSourceStrategy.scala | 15 +++++---- .../execution/datasources/LogicalRelation.scala | 39 ++++++++-------------- .../datasources/PruneFileSourcePartitions.scala | 4 +-- 3 files changed, 22 insertions(+), 36 deletions(-) (limited to 'sql/core/src/main') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e5c7c383d7..2d83d512e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] options = table.storage.properties ++ pathOption, catalogTable = Some(table)) - LogicalRelation( - dataSource.resolveRelation(checkFilesExist = false), - catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) } }).asInstanceOf[LogicalRelation] - // It's possible that the table schema is empty and need to be inferred at runtime. We should - // not specify expected outputs for this case. - val expectedOutputs = if (r.output.isEmpty) None else Some(r.output) - plan.copy(expectedOutputAttributes = expectedOutputs) + if (r.output.isEmpty) { + // It's possible that the table schema is empty and need to be inferred at runtime. For this + // case, we don't need to change the output of the cached plan. + plan + } else { + plan.copy(output = r.output) + } } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 3b14b794fd..4215203960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation @@ -26,31 +26,13 @@ import org.apache.spark.util.Utils /** * Used to link a [[BaseRelation]] in to a logical query plan. - * - * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without - * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for - * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details. */ case class LogicalRelation( relation: BaseRelation, - expectedOutputAttributes: Option[Seq[Attribute]] = None, - catalogTable: Option[CatalogTable] = None) + output: Seq[AttributeReference], + catalogTable: Option[CatalogTable]) extends LeafNode with MultiInstanceRelation { - override val output: Seq[AttributeReference] = { - val attrs = relation.schema.toAttributes - expectedOutputAttributes.map { expectedAttrs => - assert(expectedAttrs.length == attrs.length) - attrs.zip(expectedAttrs).map { - // We should respect the attribute names provided by base relation and only use the - // exprId in `expectedOutputAttributes`. - // The reason is that, some relations(like parquet) will reconcile attribute names to - // workaround case insensitivity issue. - case (attr, expected) => attr.withExprId(expected.exprId) - } - }.getOrElse(attrs) - } - // Logical Relations are distinct if they have different output for the sake of transformations. override def equals(other: Any): Boolean = other match { case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output @@ -87,11 +69,8 @@ case class LogicalRelation( * unique expression ids. We respect the `expectedOutputAttributes` and create * new instances of attributes in it. */ - override def newInstance(): this.type = { - LogicalRelation( - relation, - expectedOutputAttributes.map(_.map(_.newInstance())), - catalogTable).asInstanceOf[this.type] + override def newInstance(): LogicalRelation = { + this.copy(output = output.map(_.newInstance())) } override def refresh(): Unit = relation match { @@ -101,3 +80,11 @@ case class LogicalRelation( override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } + +object LogicalRelation { + def apply(relation: BaseRelation): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, None) + + def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, Some(table)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 8566a80610..905b8683e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy( - relation = prunedFsRelation, - expectedOutputAttributes = Some(logicalRelation.output)) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) -- cgit v1.2.3