aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-09-02 15:10:12 -0700
committerDavies Liu <davies.liu@gmail.com>2016-09-02 15:10:12 -0700
commited9c884dcf925500ceb388b06b33bd2c95cd2ada (patch)
tree7394f4a30e5994193b575817c8d768276ea33541 /sql
parenteac1d0e921345b5d15aa35d8c565140292ab2af3 (diff)
downloadspark-ed9c884dcf925500ceb388b06b33bd2c95cd2ada.tar.gz
spark-ed9c884dcf925500ceb388b06b33bd2c95cd2ada.tar.bz2
spark-ed9c884dcf925500ceb388b06b33bd2c95cd2ada.zip
[SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in DataFrameWriter
## What changes were proposed in this pull request? Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs. For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result. Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy). An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed `query` will also be passed into QueryExecution again. This PR make these `query` not part of the children, so they will not be optimized and analyzed again. Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again. ## How was this patch tested? Added regression tests. Author: Davies Liu <davies@databricks.com> Closes #14797 from davies/fix_writer.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala2
7 files changed, 25 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index cce1489abd..424a962b5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
*/
trait RunnableCommand extends LogicalPlan with logical.Command {
override def output: Seq[Attribute] = Seq.empty
- override def children: Seq[LogicalPlan] = Seq.empty
+ final override def children: Seq[LogicalPlan] = Seq.empty
def run(sparkSession: SparkSession): Seq[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index da3f6c600a..c7e3279061 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -113,7 +113,7 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {
- override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+ override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 348530888d..5968db84cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -479,13 +480,23 @@ case class DataSource(
}
}
+ // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
+ // not need to have the query as child, to avoid to analyze an optimized query,
+ // because InsertIntoHadoopFsRelationCommand will be optimized first.
+ val columns = partitionColumns.map { name =>
+ val plan = data.logicalPlan
+ plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
+ }.asInstanceOf[Attribute]
+ }
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath,
- partitionColumns.map(UnresolvedAttribute.quoted),
+ columns,
bucketSpec,
format,
() => Unit, // No existing table needs to be refreshed.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a6621054fc..8286467e96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
InsertIntoHadoopFsRelationCommand(
outputPath,
- t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
+ query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
t.fileFormat,
() => t.refresh(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index de822180ab..02ce7fab64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode)
extends RunnableCommand {
- override def children: Seq[LogicalPlan] = query :: Nil
+ override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
override def run(sparkSession: SparkSession): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 05935cec4b..63b0e4588e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -449,6 +449,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}
+ test("SPARK-17230: write out results of decimal calculation") {
+ val df = spark.range(99, 101)
+ .selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")
+ df.write.mode(SaveMode.Overwrite).parquet(dir)
+ val df2 = spark.read.parquet(dir)
+ checkAnswer(df2, df)
+ }
+
private def testRead(
df: => DataFrame,
expectedResult: Seq[String],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 6e6b1c2a2b..ef5a5a001f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand(
private val tableIdentifier = tableDesc.identifier
- override def children: Seq[LogicalPlan] = Seq(query)
+ override def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
lazy val metastoreRelation: MetastoreRelation = {