aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZhenhua Wang <wzh_zju@163.com>2016-09-22 14:48:49 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-22 14:48:49 +0800
commitde7df7defc99e04fefd990974151a701f64b75b4 (patch)
tree86bfbd8a9c2cb94a025b481da031d6135834feb6
parent3a80f92f8f4b91d0a85724bca7d81c6f5bbb78fd (diff)
downloadspark-de7df7defc99e04fefd990974151a701f64b75b4.tar.gz
spark-de7df7defc99e04fefd990974151a701f64b75b4.tar.bz2
spark-de7df7defc99e04fefd990974151a701f64b75b4.zip
[SPARK-17625][SQL] set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation
## What changes were proposed in this pull request? We should set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation, otherwise the outputs of LogicalRelation are different from outputs of SimpleCatalogRelation - they have different exprId's. ## How was this patch tested? add a test case Author: Zhenhua Wang <wzh_zju@163.com> Closes #15182 from wzhfy/expectedAttributes.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala14
2 files changed, 20 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index c8ad5b3034..63f01c5bb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
* source information.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
- private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
+ private def readDataSourceTable(
+ sparkSession: SparkSession,
+ simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
+ val table = simpleCatalogRelation.catalogTable
val dataSource =
DataSource(
sparkSession,
@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
LogicalRelation(
dataSource.resolveRelation(),
+ expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(s.metadata) =>
- i.copy(table = readDataSourceTable(sparkSession, s.metadata))
+ i.copy(table = readDataSourceTable(sparkSession, s))
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
- readDataSourceTable(sparkSession, s.metadata)
+ readDataSourceTable(sparkSession, s)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c2d256bdd3..2c60a7dd92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -26,7 +26,8 @@ import scala.util.Random
import org.scalatest.Matchers._
import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
@@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect
assert(d.size == d.distinct.size)
}
+
+ test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
+ val tableName = "tbl"
+ withTable(tableName) {
+ spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
+ val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
+ val expr = relation.resolve("i")
+ val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
+ qe.assertAnalyzed()
+ }
+ }
}