aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-14 23:10:20 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-14 23:10:20 +0800
commit52738d4e099a19466ef909b77c24cab109548706 (patch)
tree600b0753c5129b8f7588dd3dc011ae187081d910 /sql/core
parentdc0a4c916151c795dc41b5714e9d23b4937f4636 (diff)
downloadspark-52738d4e099a19466ef909b77c24cab109548706.tar.gz
spark-52738d4e099a19466ef909b77c24cab109548706.tar.bz2
spark-52738d4e099a19466ef909b77c24cab109548706.zip
[SPARK-17409][SQL] Do Not Optimize Query in CTAS More Than Once
### What changes were proposed in this pull request? As explained in https://github.com/apache/spark/pull/14797: >Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs. For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result. We should not optimize the query in CTAS more than once. For example, ```Scala spark.range(99, 101).createOrReplaceTempView("tab1") val sqlStmt = "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1" sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt") checkAnswer(spark.table("tab2"), sql(sqlStmt)) ``` Before this PR, the results do not match ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == ![100,100.000000000000000000] [100,null] [99,99.000000000000000000] [99,99.000000000000000000] ``` After this PR, the results match. ``` +---+----------------------+ |id |num | +---+----------------------+ |99 |99.000000000000000000 | |100|100.000000000000000000| +---+----------------------+ ``` In this PR, we do not treat the `query` in CTAS as a child. Thus, the `query` will not be optimized when optimizing CTAS statement. However, we still need to analyze it for normalizing and verifying the CTAS in the Analyzer. Thus, we do it in the analyzer rule `PreprocessDDL`, because so far only this rule needs the analyzed plan of the `query`. ### How was this patch tested? Added a test Author: gatorsmile <gatorsmile@gmail.com> Closes #15048 from gatorsmile/ctasOptimized.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala12
9 files changed, 38 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index b0e2d03af0..af6def52d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -129,6 +129,4 @@ case object ResetCommand extends RunnableCommand with Logging {
sparkSession.sessionState.conf.clear()
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 697e2ff211..c31f4dc9ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -47,8 +46,6 @@ case class CacheTableCommand(
Seq.empty[Row]
}
-
- override def output: Seq[Attribute] = Seq.empty
}
@@ -58,8 +55,6 @@ case class UncacheTableCommand(tableIdent: TableIdentifier) extends RunnableComm
sparkSession.catalog.uncacheTable(tableIdent.quotedString)
Seq.empty[Row]
}
-
- override def output: Seq[Attribute] = Seq.empty
}
/**
@@ -71,6 +66,4 @@ case object ClearCacheCommand extends RunnableCommand {
sparkSession.catalog.clearCache()
Seq.empty[Row]
}
-
- override def output: Seq[Attribute] = Seq.empty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 424a962b5e..698c625d61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -35,9 +35,7 @@ import org.apache.spark.sql.types._
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
-trait RunnableCommand extends LogicalPlan with logical.Command {
- override def output: Seq[Attribute] = Seq.empty
- final override def children: Seq[LogicalPlan] = Seq.empty
+trait RunnableCommand extends logical.Command {
def run(sparkSession: SparkSession): Seq[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
index 597ec27ce6..e5a6a5f60b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
@@ -59,6 +59,4 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
sparkSession.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index bc1c4f85e3..dcda2f8d1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -70,8 +70,6 @@ case class CreateDatabaseCommand(
ifNotExists)
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
@@ -101,8 +99,6 @@ case class DropDatabaseCommand(
sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
/**
@@ -126,8 +122,6 @@ case class AlterDatabasePropertiesCommand(
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 1b1e2123b7..fa95af2648 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types._
-case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
- extends LogicalPlan with Command {
+case class CreateTable(
+ tableDesc: CatalogTable,
+ mode: SaveMode,
+ query: Option[LogicalPlan]) extends Command {
assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")
if (query.isEmpty) {
@@ -35,9 +37,7 @@ case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[Lo
"create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
}
- override def output: Seq[Attribute] = Seq.empty[Attribute]
-
- override def children: Seq[LogicalPlan] = query.toSeq
+ override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
}
case class CreateTempViewUsing(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index fbf4063ff6..bd6eb6e053 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -66,9 +66,10 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
}
/**
- * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking.
+ * Analyze [[CreateTable]] and do some normalization and checking.
+ * For CREATE TABLE AS SELECT, the SELECT query is also analyzed.
*/
-case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// When we CREATE TABLE without specifying the table schema, we should fail the query if
@@ -95,9 +96,19 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
// * can't use all table columns as partition columns.
// * partition columns' type must be AtomicType.
// * sort columns' type must be orderable.
- case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
- val schema = if (query.isDefined) query.get.schema else tableDesc.schema
- val columnNames = if (conf.caseSensitiveAnalysis) {
+ case c @ CreateTable(tableDesc, mode, query) =>
+ val analyzedQuery = query.map { q =>
+ // Analyze the query in CTAS and then we can do the normalization and checking.
+ val qe = sparkSession.sessionState.executePlan(q)
+ qe.assertAnalyzed()
+ qe.analyzed
+ }
+ val schema = if (analyzedQuery.isDefined) {
+ analyzedQuery.get.schema
+ } else {
+ tableDesc.schema
+ }
+ val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
schema.map(_.name)
} else {
schema.map(_.name.toLowerCase)
@@ -106,7 +117,7 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked)
- c.copy(tableDesc = bucketColsChecked)
+ c.copy(tableDesc = bucketColsChecked, query = analyzedQuery)
}
private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
@@ -176,6 +187,7 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
colName: String,
colType: String): String = {
val tableCols = schema.map(_.name)
+ val conf = sparkSession.sessionState.conf
tableCols.find(conf.resolver(_, colName)).getOrElse {
failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " +
s"defined table columns are: ${tableCols.mkString(", ")}")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8fdbd0f2c6..c899773b6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -111,7 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
- PreprocessDDL(conf) ::
+ AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) ::
new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis(conf) ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 729c9fdda5..344d4aa6cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -236,4 +236,16 @@ class CreateTableAsSelectSuite
assert(e.contains("Expected positive number of buckets, but got `0`"))
}
}
+
+ test("CTAS of decimal calculation") {
+ withTable("tab2") {
+ withTempView("tab1") {
+ spark.range(99, 101).createOrReplaceTempView("tab1")
+ val sqlStmt =
+ "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1"
+ sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
+ checkAnswer(spark.table("tab2"), sql(sqlStmt))
+ }
+ }
+ }
}