aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-06-02 14:11:01 -0700
committerAndrew Or <andrew@databricks.com>2016-06-02 14:11:01 -0700
commitd109a1beeef5bca1e683247e0a5db4ec841bf3ba (patch)
tree0453e640b07028edb5d3b1adc49962529e0bca17
parent9aff6f3b1915523432b1921fdd30fa015ed5d670 (diff)
downloadspark-d109a1beeef5bca1e683247e0a5db4ec841bf3ba.tar.gz
spark-d109a1beeef5bca1e683247e0a5db4ec841bf3ba.tar.bz2
spark-d109a1beeef5bca1e683247e0a5db4ec841bf3ba.zip
[SPARK-15711][SQL] Ban CREATE TEMPORARY TABLE USING AS SELECT
## What changes were proposed in this pull request? This PR bans syntax like `CREATE TEMPORARY TABLE USING AS SELECT` `CREATE TEMPORARY TABLE ... USING ... AS ...` is not properly implemented, the temporary data is not cleaned up when the session exits. Before a full fix, we probably should ban this syntax. This PR only impact syntax like `CREATE TEMPORARY TABLE ... USING ... AS ...`. Other syntax like `CREATE TEMPORARY TABLE .. USING ...` and `CREATE TABLE ... USING ...` are not impacted. ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #13451 from clockfly/ban_create_temp_table_using_as.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala32
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala265
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala46
6 files changed, 142 insertions, 221 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 25678e938d..50ae9667f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -561,7 +561,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
CreateTableUsingAsSelect(
tableIdent,
source,
- temporary = false,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
getBucketSpec,
mode,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 01409c6a77..8ffc55668a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -317,17 +317,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Get the backing query.
val query = plan(ctx.query)
+ if (temp) {
+ throw operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
+ }
+
// Determine the storage mode.
val mode = if (ifNotExists) {
SaveMode.Ignore
- } else if (temp) {
- SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}
CreateTableUsingAsSelect(
- table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
+ table, provider, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
CreateTableUsing(
@@ -960,7 +962,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
- temporary = false,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = None,
mode = mode,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9610506e13..b20897e2d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -397,15 +397,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
throw new AnalysisException(
"allowExisting should be set to false when creating a temporary table.")
- case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty =>
- sys.error("Cannot create temporary partitioned table.")
-
- case c: CreateTableUsingAsSelect if c.temporary =>
- val cmd = CreateTempTableUsingAsSelectCommand(
- c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
- ExecutedCommandExec(cmd) :: Nil
-
- case c: CreateTableUsingAsSelect if !c.temporary =>
+ case c: CreateTableUsingAsSelect =>
val cmd =
CreateDataSourceTableAsSelectCommand(
c.tableIdent,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index edbccde214..bf272e3c06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -56,7 +56,6 @@ case class CreateTableUsing(
case class CreateTableUsingAsSelect(
tableIdent: TableIdentifier,
provider: String,
- temporary: Boolean,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
mode: SaveMode,
@@ -91,37 +90,6 @@ case class CreateTempTableUsing(
}
}
-case class CreateTempTableUsingAsSelectCommand(
- tableIdent: TableIdentifier,
- provider: String,
- partitionColumns: Array[String],
- mode: SaveMode,
- options: Map[String, String],
- query: LogicalPlan) extends RunnableCommand {
-
- if (tableIdent.database.isDefined) {
- throw new AnalysisException(
- s"Temporary table '$tableIdent' should not have specified a database")
- }
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val df = Dataset.ofRows(sparkSession, query)
- val dataSource = DataSource(
- sparkSession,
- className = provider,
- partitionColumns = partitionColumns,
- bucketSpec = None,
- options = options)
- val result = dataSource.write(mode, df)
- sparkSession.sessionState.catalog.createTempView(
- tableIdent.table,
- Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
- overrideIfExists = true)
-
- Seq.empty[Row]
- }
-}
-
case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index cbddb0643b..f9a07dbdf0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -17,16 +17,20 @@
package org.apache.spark.sql.sources
-import java.io.{File, IOException}
+import java.io.File
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
+
protected override lazy val sql = spark.sql _
private var path: File = null
@@ -40,172 +44,175 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
override def afterAll(): Unit = {
try {
spark.catalog.dropTempView("jt")
+ if (path.exists()) {
+ Utils.deleteRecursively(path)
+ }
} finally {
super.afterAll()
}
}
- after {
- Utils.deleteRecursively(path)
+ before {
+ if (path.exists()) {
+ Utils.deleteRecursively(path)
+ }
}
- test("CREATE TEMPORARY TABLE AS SELECT") {
- sql(
- s"""
- |CREATE TEMPORARY TABLE jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a, b FROM jt
- """.stripMargin)
-
- checkAnswer(
- sql("SELECT a, b FROM jsonTable"),
- sql("SELECT a, b FROM jt").collect())
-
- spark.catalog.dropTempView("jsonTable")
+ test("CREATE TABLE USING AS SELECT") {
+ withTable("jsonTable") {
+ sql(
+ s"""
+ |CREATE TABLE jsonTable
+ |USING json
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a, b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ sql("SELECT a, b FROM jt"))
+ }
}
- test("CREATE TEMPORARY TABLE AS SELECT based on the file without write permission") {
+ test("CREATE TABLE USING AS SELECT based on the file without write permission") {
val childPath = new File(path.toString, "child")
path.mkdir()
- childPath.createNewFile()
path.setWritable(false)
- val e = intercept[IOException] {
+ val e = intercept[SparkException] {
sql(
s"""
- |CREATE TEMPORARY TABLE jsonTable
+ |CREATE TABLE jsonTable
|USING json
|OPTIONS (
- | path '${path.toString}'
+ | path '${childPath.toString}'
|) AS
|SELECT a, b FROM jt
- """.stripMargin)
+ """.stripMargin)
sql("SELECT a, b FROM jsonTable").collect()
}
- assert(e.getMessage().contains("Unable to clear output directory"))
+ assert(e.getMessage().contains("Job aborted"))
path.setWritable(true)
}
test("create a table, drop it and create another one with the same name") {
- sql(
- s"""
- |CREATE TEMPORARY TABLE jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a, b FROM jt
- """.stripMargin)
-
- checkAnswer(
- sql("SELECT a, b FROM jsonTable"),
- sql("SELECT a, b FROM jt").collect())
-
- val message = intercept[ParseException]{
+ withTable("jsonTable") {
sql(
s"""
- |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a * 4 FROM jt
- """.stripMargin)
- }.getMessage
- assert(message.toLowerCase.contains("operation not allowed"))
-
- // Overwrite the temporary table.
- sql(
- s"""
- |CREATE TEMPORARY TABLE jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a * 4 FROM jt
- """.stripMargin)
- checkAnswer(
- sql("SELECT * FROM jsonTable"),
- sql("SELECT a * 4 FROM jt").collect())
-
- spark.catalog.dropTempView("jsonTable")
- // Explicitly delete the data.
- if (path.exists()) Utils.deleteRecursively(path)
-
- sql(
- s"""
- |CREATE TEMPORARY TABLE jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT b FROM jt
- """.stripMargin)
-
- checkAnswer(
- sql("SELECT * FROM jsonTable"),
- sql("SELECT b FROM jt").collect())
-
- spark.catalog.dropTempView("jsonTable")
- }
+ |CREATE TABLE jsonTable
+ |USING json
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a, b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ sql("SELECT a, b FROM jt"))
+
+ // Creates a table of the same name with flag "if not exists", nothing happens
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS jsonTable
+ |USING json
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT a * 4 FROM jt
+ """.stripMargin)
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT a, b FROM jt"))
+
+ // Explicitly drops the table and deletes the underlying data.
+ sql("DROP TABLE jsonTable")
+ if (path.exists()) Utils.deleteRecursively(path)
- test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
- val message = intercept[ParseException]{
+ // Creates a table of the same name again, this time we succeed.
sql(
s"""
- |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT b FROM jt
- """.stripMargin)
- }.getMessage
- assert(message.toLowerCase.contains("operation not allowed"))
+ |CREATE TABLE jsonTable
+ |USING json
+ |OPTIONS (
+ | path '${path.toString}'
+ |) AS
+ |SELECT b FROM jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM jsonTable"),
+ sql("SELECT b FROM jt"))
+ }
+ }
+
+ test("disallows CREATE TEMPORARY TABLE ... USING ... AS query") {
+ withTable("t") {
+ val error = intercept[ParseException] {
+ sql(
+ s"""
+ |CREATE TEMPORARY TABLE t USING PARQUET
+ |OPTIONS (PATH '${path.toString}')
+ |PARTITIONED BY (a)
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+ }.getMessage
+ assert(error.contains("Operation not allowed") &&
+ error.contains("CREATE TEMPORARY TABLE ... USING ... AS query"))
+ }
}
- test("a CTAS statement with column definitions is not allowed") {
- intercept[AnalysisException]{
+ test("disallows CREATE EXTERNAL TABLE ... USING ... AS query") {
+ withTable("t") {
+ val error = intercept[ParseException] {
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE t USING PARQUET
+ |OPTIONS (PATH '${path.toString}')
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+ }.getMessage
+
+ assert(error.contains("Operation not allowed") &&
+ error.contains("CREATE EXTERNAL TABLE ... USING"))
+ }
+ }
+
+ test("create table using as select - with partitioned by") {
+ val catalog = spark.sessionState.catalog
+ withTable("t") {
sql(
s"""
- |CREATE TEMPORARY TABLE jsonTable (a int, b string)
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a, b FROM jt
- """.stripMargin)
+ |CREATE TABLE t USING PARQUET
+ |OPTIONS (PATH '${path.toString}')
+ |PARTITIONED BY (a)
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+ val table = catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a"))
}
}
- test("it is not allowed to write to a table while querying it.") {
- sql(
- s"""
- |CREATE TEMPORARY TABLE jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a, b FROM jt
- """.stripMargin)
-
- val message = intercept[AnalysisException] {
+ test("create table using as select - with bucket") {
+ val catalog = spark.sessionState.catalog
+ withTable("t") {
sql(
s"""
- |CREATE TEMPORARY TABLE jsonTable
- |USING json
- |OPTIONS (
- | path '${path.toString}'
- |) AS
- |SELECT a, b FROM jsonTable
- """.stripMargin)
- }.getMessage
- assert(
- message.contains("Cannot overwrite table "),
- "Writing to a table while querying it should not be allowed.")
+ |CREATE TABLE t USING PARQUET
+ |OPTIONS (PATH '${path.toString}')
+ |CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS
+ |AS SELECT 1 AS a, 2 AS b
+ """.stripMargin
+ )
+ val table = catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
+ Some(BucketSpec(5, Seq("a"), Seq("b"))))
+ }
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 24de223cf8..499819f32b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1506,52 +1506,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test(
- "SPARK-14488 \"CREATE TEMPORARY TABLE ... USING ... AS SELECT ...\" " +
- "shouldn't create persisted table"
- ) {
- withTempPath { dir =>
- withTempTable("t1", "t2") {
- val path = dir.getCanonicalPath
- val ds = spark.range(10)
- ds.createOrReplaceTempView("t1")
-
- sql(
- s"""CREATE TEMPORARY TABLE t2
- |USING PARQUET
- |OPTIONS (PATH '$path')
- |AS SELECT * FROM t1
- """.stripMargin)
-
- checkAnswer(
- spark.sql("SHOW TABLES").select('isTemporary).filter('tableName === "t2"),
- Row(true)
- )
-
- checkAnswer(table("t2"), table("t1"))
- }
- }
- }
-
- test(
- "SPARK-14493 \"CREATE TEMPORARY TABLE ... USING ... AS SELECT ...\" " +
- "shouldn always be used together with PATH data source option"
- ) {
- withTempTable("t") {
- spark.range(10).createOrReplaceTempView("t")
-
- val message = intercept[IllegalArgumentException] {
- sql(
- s"""CREATE TEMPORARY TABLE t1
- |USING PARQUET
- |AS SELECT * FROM t
- """.stripMargin)
- }.getMessage
-
- assert(message == "'path' is not specified")
- }
- }
-
test("derived from Hive query file: drop_database_removes_partition_dirs.q") {
// This test verifies that if a partition exists outside a table's current location when the
// database is dropped the partition's location is dropped as well.