aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-04-07 15:58:50 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-07 15:58:50 +0800
commitad3cc1312db3b5667cea134940a09896a4609b74 (patch)
tree6589f0edb7c12c972d02fab83b25b7c226290dee /sql/core/src/main
parent626b4cafce7d2dca186144336939d4d993b6f878 (diff)
downloadspark-ad3cc1312db3b5667cea134940a09896a4609b74.tar.gz
spark-ad3cc1312db3b5667cea134940a09896a4609b74.tar.bz2
spark-ad3cc1312db3b5667cea134940a09896a4609b74.zip
[SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly
## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17552 from cloud-fan/minor.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala4
3 files changed, 22 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e5c7c383d7..2d83d512e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))
- LogicalRelation(
- dataSource.resolveRelation(checkFilesExist = false),
- catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}).asInstanceOf[LogicalRelation]
- // It's possible that the table schema is empty and need to be inferred at runtime. We should
- // not specify expected outputs for this case.
- val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
- plan.copy(expectedOutputAttributes = expectedOutputs)
+ if (r.output.isEmpty) {
+ // It's possible that the table schema is empty and need to be inferred at runtime. For this
+ // case, we don't need to change the output of the cached plan.
+ plan
+ } else {
+ plan.copy(output = r.output)
+ }
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3b14b794fd..4215203960 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
@@ -26,31 +26,13 @@ import org.apache.spark.util.Utils
/**
* Used to link a [[BaseRelation]] in to a logical query plan.
- *
- * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
- * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
- * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
*/
case class LogicalRelation(
relation: BaseRelation,
- expectedOutputAttributes: Option[Seq[Attribute]] = None,
- catalogTable: Option[CatalogTable] = None)
+ output: Seq[AttributeReference],
+ catalogTable: Option[CatalogTable])
extends LeafNode with MultiInstanceRelation {
- override val output: Seq[AttributeReference] = {
- val attrs = relation.schema.toAttributes
- expectedOutputAttributes.map { expectedAttrs =>
- assert(expectedAttrs.length == attrs.length)
- attrs.zip(expectedAttrs).map {
- // We should respect the attribute names provided by base relation and only use the
- // exprId in `expectedOutputAttributes`.
- // The reason is that, some relations(like parquet) will reconcile attribute names to
- // workaround case insensitivity issue.
- case (attr, expected) => attr.withExprId(expected.exprId)
- }
- }.getOrElse(attrs)
- }
-
// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
@@ -87,11 +69,8 @@ case class LogicalRelation(
* unique expression ids. We respect the `expectedOutputAttributes` and create
* new instances of attributes in it.
*/
- override def newInstance(): this.type = {
- LogicalRelation(
- relation,
- expectedOutputAttributes.map(_.map(_.newInstance())),
- catalogTable).asInstanceOf[this.type]
+ override def newInstance(): LogicalRelation = {
+ this.copy(output = output.map(_.newInstance()))
}
override def refresh(): Unit = relation match {
@@ -101,3 +80,11 @@ case class LogicalRelation(
override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
+
+object LogicalRelation {
+ def apply(relation: BaseRelation): LogicalRelation =
+ LogicalRelation(relation, relation.schema.toAttributes, None)
+
+ def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
+ LogicalRelation(relation, relation.schema.toAttributes, Some(table))
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8566a80610..905b8683e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileIndex)(sparkSession)
- val prunedLogicalRelation = logicalRelation.copy(
- relation = prunedFsRelation,
- expectedOutputAttributes = Some(logicalRelation.output))
+ val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)