aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala7
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala6
13 files changed, 49 insertions, 42 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 75a5b10d9e..64f57835c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -17,9 +17,14 @@
package org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
/**
* A logical node that represents a non-query command to be executed by the system. For example,
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
-trait Command
+trait Command extends LeafNode {
+ final override def children: Seq[LogicalPlan] = Seq.empty
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 6df47acaba..ff1bb126f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -31,10 +31,7 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.IntegerType
/** A dummy command for testing unsupported operations. */
-case class DummyCommand() extends LogicalPlan with Command {
- override def output: Seq[Attribute] = Nil
- override def children: Seq[LogicalPlan] = Nil
-}
+case class DummyCommand() extends Command
class UnsupportedOperationsSuite extends SparkFunSuite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index b0e2d03af0..af6def52d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -129,6 +129,4 @@ case object ResetCommand extends RunnableCommand with Logging {
sparkSession.sessionState.conf.clear()
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 697e2ff211..c31f4dc9ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -47,8 +46,6 @@ case class CacheTableCommand(
Seq.empty[Row]
}
-
- override def output: Seq[Attribute] = Seq.empty
}
@@ -58,8 +55,6 @@ case class UncacheTableCommand(tableIdent: TableIdentifier) extends RunnableComm
sparkSession.catalog.uncacheTable(tableIdent.quotedString)
Seq.empty[Row]
}
-
- override def output: Seq[Attribute] = Seq.empty
}
/**
@@ -71,6 +66,4 @@ case object ClearCacheCommand extends RunnableCommand {
sparkSession.catalog.clearCache()
Seq.empty[Row]
}
-
- override def output: Seq[Attribute] = Seq.empty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 424a962b5e..698c625d61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -35,9 +35,7 @@ import org.apache.spark.sql.types._
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
-trait RunnableCommand extends LogicalPlan with logical.Command {
- override def output: Seq[Attribute] = Seq.empty
- final override def children: Seq[LogicalPlan] = Seq.empty
+trait RunnableCommand extends logical.Command {
def run(sparkSession: SparkSession): Seq[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
index 597ec27ce6..e5a6a5f60b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
@@ -59,6 +59,4 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
sparkSession.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index bc1c4f85e3..dcda2f8d1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -70,8 +70,6 @@ case class CreateDatabaseCommand(
ifNotExists)
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
@@ -101,8 +99,6 @@ case class DropDatabaseCommand(
sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
/**
@@ -126,8 +122,6 @@ case class AlterDatabasePropertiesCommand(
Seq.empty[Row]
}
-
- override val output: Seq[Attribute] = Seq.empty
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 1b1e2123b7..fa95af2648 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types._
-case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
- extends LogicalPlan with Command {
+case class CreateTable(
+ tableDesc: CatalogTable,
+ mode: SaveMode,
+ query: Option[LogicalPlan]) extends Command {
assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")
if (query.isEmpty) {
@@ -35,9 +37,7 @@ case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[Lo
"create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
}
- override def output: Seq[Attribute] = Seq.empty[Attribute]
-
- override def children: Seq[LogicalPlan] = query.toSeq
+ override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
}
case class CreateTempViewUsing(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index fbf4063ff6..bd6eb6e053 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -66,9 +66,10 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
}
/**
- * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking.
+ * Analyze [[CreateTable]] and do some normalization and checking.
+ * For CREATE TABLE AS SELECT, the SELECT query is also analyzed.
*/
-case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// When we CREATE TABLE without specifying the table schema, we should fail the query if
@@ -95,9 +96,19 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
// * can't use all table columns as partition columns.
// * partition columns' type must be AtomicType.
// * sort columns' type must be orderable.
- case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
- val schema = if (query.isDefined) query.get.schema else tableDesc.schema
- val columnNames = if (conf.caseSensitiveAnalysis) {
+ case c @ CreateTable(tableDesc, mode, query) =>
+ val analyzedQuery = query.map { q =>
+ // Analyze the query in CTAS and then we can do the normalization and checking.
+ val qe = sparkSession.sessionState.executePlan(q)
+ qe.assertAnalyzed()
+ qe.analyzed
+ }
+ val schema = if (analyzedQuery.isDefined) {
+ analyzedQuery.get.schema
+ } else {
+ tableDesc.schema
+ }
+ val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
schema.map(_.name)
} else {
schema.map(_.name.toLowerCase)
@@ -106,7 +117,7 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked)
- c.copy(tableDesc = bucketColsChecked)
+ c.copy(tableDesc = bucketColsChecked, query = analyzedQuery)
}
private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
@@ -176,6 +187,7 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
colName: String,
colType: String): String = {
val tableCols = schema.map(_.name)
+ val conf = sparkSession.sessionState.conf
tableCols.find(conf.resolver(_, colName)).getOrElse {
failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " +
s"defined table columns are: ${tableCols.mkString(", ")}")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8fdbd0f2c6..c899773b6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -111,7 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
- PreprocessDDL(conf) ::
+ AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) ::
new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis(conf) ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 729c9fdda5..344d4aa6cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -236,4 +236,16 @@ class CreateTableAsSelectSuite
assert(e.contains("Expected positive number of buckets, but got `0`"))
}
}
+
+ test("CTAS of decimal calculation") {
+ withTable("tab2") {
+ withTempView("tab1") {
+ spark.range(99, 101).createOrReplaceTempView("tab1")
+ val sqlStmt =
+ "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1"
+ sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
+ checkAnswer(spark.table("tab2"), sql(sqlStmt))
+ }
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 15e1255653..eb10c11382 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -60,7 +60,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
- PreprocessDDL(conf) ::
+ AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 98afd99a20..f9751e3d5f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -77,7 +77,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"src")
}
- test("SPARK-6212: The EXPLAIN output of CTAS only shows the analyzed plan") {
+ test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
withTempView("jt") {
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
spark.read.json(rdd).createOrReplaceTempView("jt")
@@ -98,8 +98,8 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
val physicalIndex = outputs.indexOf("== Physical Plan ==")
- assert(!outputs.substring(physicalIndex).contains("Subquery"),
- "Physical Plan should not contain Subquery since it's eliminated by optimizer")
+ assert(outputs.substring(physicalIndex).contains("Subquery"),
+ "Physical Plan should contain SubqueryAlias since the query should not be optimized")
}
}