aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-05 14:34:24 -0700
committerAndrew Or <andrew@databricks.com>2016-05-05 14:34:24 -0700
commit8cba57a75cf9e29b54d97366a039a97a2f305d5d (patch)
treeb9cb57154348cd0242deeff9a78e346e7b04aaa6 /sql
parent63db2bd283a430971d85f2a7b06dac77723c56fa (diff)
downloadspark-8cba57a75cf9e29b54d97366a039a97a2f305d5d.tar.gz
spark-8cba57a75cf9e29b54d97366a039a97a2f305d5d.tar.bz2
spark-8cba57a75cf9e29b54d97366a039a97a2f305d5d.zip
[SPARK-14124][SQL][FOLLOWUP] Implement Database-related DDL Commands
#### What changes were proposed in this pull request? First, a few test cases failed in mac OS X because the property value of `java.io.tmpdir` does not include a trailing slash on some platform. Hive always removes the last trailing slash. For example, what I got in the web: ``` Win NT --> C:\TEMP\ Win XP --> C:\TEMP Solaris --> /var/tmp/ Linux --> /var/tmp ``` Second, a couple of test cases are added to verify if the commands work properly. #### How was this patch tested? Added a test case for it and correct the previous test cases. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12081 from gatorsmile/mkdir.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala249
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala150
4 files changed, 311 insertions, 97 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ff6303471e..eff420eb4c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -146,6 +146,10 @@ class SessionCatalog(
currentDb = db
}
+ /**
+ * Get the path for creating a non-default database when database location is not provided
+ * by users.
+ */
def getDefaultDBPath(db: String): String = {
val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
new Path(new Path(conf.warehousePath), database + ".db").toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index aa06c014fb..085bdaff4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -40,7 +40,10 @@ import org.apache.spark.sql.types._
* unless 'ifNotExists' is true.
* The syntax of using this command in SQL is:
* {{{
- * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
+ * CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
+ * [COMMENT database_comment]
+ * [LOCATION database_directory]
+ * [WITH DBPROPERTIES (property_name=property_value, ...)];
* }}}
*/
case class CreateDatabase(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0ae099ecc2..6085098a70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -95,49 +95,81 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
+ private def appendTrailingSlash(path: String): String = {
+ if (!path.endsWith(File.separator)) path + File.separator else path
+ }
+
test("the qualified path of a database is stored in the catalog") {
val catalog = sqlContext.sessionState.catalog
- val path = System.getProperty("java.io.tmpdir")
- // The generated temp path is not qualified.
- assert(!path.startsWith("file:/"))
- sql(s"CREATE DATABASE db1 LOCATION '$path'")
- val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
- assert("file" === pathInCatalog.getScheme)
- assert(path === pathInCatalog.getPath)
-
- withSQLConf(
- SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) {
- sql(s"CREATE DATABASE db2")
- val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+ withTempDir { tmpDir =>
+ val path = tmpDir.toString
+ // The generated temp path is not qualified.
+ assert(!path.startsWith("file:/"))
+ sql(s"CREATE DATABASE db1 LOCATION '$path'")
+ val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
assert("file" === pathInCatalog.getScheme)
- assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath)
- }
+ val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path
+ assert(expectedPath === pathInCatalog.getPath)
+
+ withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+ sql(s"CREATE DATABASE db2")
+ val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+ assert("file" === pathInCatalog.getScheme)
+ val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db"
+ assert(expectedPath === pathInCatalog.getPath)
+ }
- sql("DROP DATABASE db1")
- sql("DROP DATABASE db2")
+ sql("DROP DATABASE db1")
+ sql("DROP DATABASE db2")
+ }
}
test("Create/Drop Database") {
- withSQLConf(
- SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
- val catalog = sqlContext.sessionState.catalog
+ withTempDir { tmpDir =>
+ val path = tmpDir.toString
+ withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+ val catalog = sqlContext.sessionState.catalog
+ val databaseNames = Seq("db1", "`database`")
+
+ databaseNames.foreach { dbName =>
+ try {
+ val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- val databaseNames = Seq("db1", "`database`")
+ sql(s"CREATE DATABASE $dbName")
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+ val expectedLocation =
+ "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+ assert(db1 == CatalogDatabase(
+ dbNameWithoutBackTicks,
+ "",
+ expectedLocation,
+ Map.empty))
+ sql(s"DROP DATABASE $dbName CASCADE")
+ assert(!catalog.databaseExists(dbNameWithoutBackTicks))
+ } finally {
+ catalog.reset()
+ }
+ }
+ }
+ }
+ }
+ test("Create/Drop Database - location") {
+ val catalog = sqlContext.sessionState.catalog
+ val databaseNames = Seq("db1", "`database`")
+ withTempDir { tmpDir =>
+ val path = tmpDir.toString
+ val dbPath = "file:" + path
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-
- sql(s"CREATE DATABASE $dbName")
+ sql(s"CREATE DATABASE $dbName Location '$path'")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
- val expectedLocation =
- "file:" + System.getProperty("java.io.tmpdir") +
- File.separator + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
- expectedLocation,
+ if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
@@ -149,77 +181,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("Create Database - database already exists") {
- withSQLConf(
- SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
- val catalog = sqlContext.sessionState.catalog
- val databaseNames = Seq("db1", "`database`")
-
- databaseNames.foreach { dbName =>
- try {
- val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
- val expectedLocation =
- "file:" + System.getProperty("java.io.tmpdir") +
- File.separator + s"$dbNameWithoutBackTicks.db"
- assert(db1 == CatalogDatabase(
- dbNameWithoutBackTicks,
- "",
- expectedLocation,
- Map.empty))
-
- val message = intercept[AnalysisException] {
+ withTempDir { tmpDir =>
+ val path = tmpDir.toString
+ withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+ val catalog = sqlContext.sessionState.catalog
+ val databaseNames = Seq("db1", "`database`")
+
+ databaseNames.foreach { dbName =>
+ try {
+ val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
- }.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
- } finally {
- catalog.reset()
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+ val expectedLocation =
+ "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+ assert(db1 == CatalogDatabase(
+ dbNameWithoutBackTicks,
+ "",
+ expectedLocation,
+ Map.empty))
+
+ val message = intercept[AnalysisException] {
+ sql(s"CREATE DATABASE $dbName")
+ }.getMessage
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+ } finally {
+ catalog.reset()
+ }
}
}
}
}
test("Alter/Describe Database") {
- withSQLConf(
- SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
- val catalog = sqlContext.sessionState.catalog
- val databaseNames = Seq("db1", "`database`")
+ withTempDir { tmpDir =>
+ val path = tmpDir.toString
+ withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+ val catalog = sqlContext.sessionState.catalog
+ val databaseNames = Seq("db1", "`database`")
- databaseNames.foreach { dbName =>
- try {
- val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- val location =
- "file:" + System.getProperty("java.io.tmpdir") +
- File.separator + s"$dbNameWithoutBackTicks.db"
-
- sql(s"CREATE DATABASE $dbName")
-
- checkAnswer(
- sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
- Row("Database Name", dbNameWithoutBackTicks) ::
- Row("Description", "") ::
- Row("Location", location) ::
- Row("Properties", "") :: Nil)
-
- sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
-
- checkAnswer(
- sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
- Row("Database Name", dbNameWithoutBackTicks) ::
- Row("Description", "") ::
- Row("Location", location) ::
- Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
-
- sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
-
- checkAnswer(
- sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
- Row("Database Name", dbNameWithoutBackTicks) ::
- Row("Description", "") ::
- Row("Location", location) ::
- Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
- } finally {
- catalog.reset()
+ databaseNames.foreach { dbName =>
+ try {
+ val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+ val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+
+ sql(s"CREATE DATABASE $dbName")
+
+ checkAnswer(
+ sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+ Row("Database Name", dbNameWithoutBackTicks) ::
+ Row("Description", "") ::
+ Row("Location", location) ::
+ Row("Properties", "") :: Nil)
+
+ sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
+
+ checkAnswer(
+ sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+ Row("Database Name", dbNameWithoutBackTicks) ::
+ Row("Description", "") ::
+ Row("Location", location) ::
+ Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
+
+ sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
+
+ checkAnswer(
+ sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+ Row("Database Name", dbNameWithoutBackTicks) ::
+ Row("Description", "") ::
+ Row("Location", location) ::
+ Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
+ } finally {
+ catalog.reset()
+ }
}
}
}
@@ -251,7 +284,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- // TODO: test drop database in restrict mode
+ test("drop non-empty database in restrict mode") {
+ val catalog = sqlContext.sessionState.catalog
+ val dbName = "db1"
+ sql(s"CREATE DATABASE $dbName")
+
+ // create a table in database
+ val tableIdent1 = TableIdentifier("tab1", Some(dbName))
+ createTable(catalog, tableIdent1)
+
+ // drop a non-empty database in Restrict mode
+ val message = intercept[AnalysisException] {
+ sql(s"DROP DATABASE $dbName RESTRICT")
+ }.getMessage
+ assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+
+ catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
+
+ assert(catalog.listDatabases().contains(dbName))
+ sql(s"DROP DATABASE $dbName RESTRICT")
+ assert(!catalog.listDatabases().contains(dbName))
+ }
+
+ test("drop non-empty database in cascade mode") {
+ val catalog = sqlContext.sessionState.catalog
+ val dbName = "db1"
+ sql(s"CREATE DATABASE $dbName")
+
+ // create a table in database
+ val tableIdent1 = TableIdentifier("tab1", Some(dbName))
+ createTable(catalog, tableIdent1)
+
+ // drop a non-empty database in CASCADE mode
+ assert(catalog.listTables(dbName).contains(tableIdent1))
+ assert(catalog.listDatabases().contains(dbName))
+ sql(s"DROP DATABASE $dbName CASCADE")
+ assert(!catalog.listDatabases().contains(dbName))
+ }
test("create table in default db") {
val catalog = sqlContext.sessionState.catalog
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 373d1a1e0e..d55ddb251d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -20,21 +20,37 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
-class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class HiveDDLSuite
+ extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import hiveContext.implicits._
+ override def afterEach(): Unit = {
+ try {
+ // drop all databases, tables and functions after each test
+ sqlContext.sessionState.catalog.reset()
+ } finally {
+ super.afterEach()
+ }
+ }
// check if the directory for recording the data of the table exists.
- private def tableDirectoryExists(tableIdentifier: TableIdentifier): Boolean = {
+ private def tableDirectoryExists(
+ tableIdentifier: TableIdentifier,
+ dbPath: Option[String] = None): Boolean = {
val expectedTablePath =
- hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
+ if (dbPath.isEmpty) {
+ hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
+ } else {
+ new Path(new Path(dbPath.get), tableIdentifier.table).toString
+ }
val filesystemPath = new Path(expectedTablePath)
val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf())
fs.exists(filesystemPath)
@@ -56,7 +72,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("drop managed tables") {
+ test("drop managed tables in default database") {
withTempDir { tmpDir =>
val tabName = "tab1"
withTable(tabName) {
@@ -83,7 +99,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("drop external data source table") {
+ test("drop external data source table in default database") {
withTempDir { tmpDir =>
val tabName = "tab1"
withTable(tabName) {
@@ -365,4 +381,126 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
+
+ private def createDatabaseWithLocation(tmpDir: File, dirExists: Boolean): Unit = {
+ val catalog = sqlContext.sessionState.catalog
+ val dbName = "db1"
+ val tabName = "tab1"
+ val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf())
+ withTable(tabName) {
+ if (dirExists) {
+ assert(tmpDir.listFiles.isEmpty)
+ } else {
+ assert(!fs.exists(new Path(tmpDir.toString)))
+ }
+ sql(s"CREATE DATABASE $dbName Location '$tmpDir'")
+ val db1 = catalog.getDatabaseMetadata(dbName)
+ val dbPath = "file:" + tmpDir
+ assert(db1 == CatalogDatabase(
+ dbName,
+ "",
+ if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
+ Map.empty))
+ sql("USE db1")
+
+ sql(s"CREATE TABLE $tabName as SELECT 1")
+ assert(tableDirectoryExists(TableIdentifier(tabName), Option(tmpDir.toString)))
+
+ assert(tmpDir.listFiles.nonEmpty)
+ sql(s"DROP TABLE $tabName")
+
+ assert(tmpDir.listFiles.isEmpty)
+ sql(s"DROP DATABASE $dbName")
+ assert(!fs.exists(new Path(tmpDir.toString)))
+ }
+ }
+
+ test("create/drop database - location without pre-created directory") {
+ withTempPath { tmpDir =>
+ createDatabaseWithLocation(tmpDir, dirExists = false)
+ }
+ }
+
+ test("create/drop database - location with pre-created directory") {
+ withTempDir { tmpDir =>
+ createDatabaseWithLocation(tmpDir, dirExists = true)
+ }
+ }
+
+ private def appendTrailingSlash(path: String): String = {
+ if (!path.endsWith(File.separator)) path + File.separator else path
+ }
+
+ private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = {
+ withTempPath { tmpDir =>
+ val path = tmpDir.toString
+ withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
+ val dbName = "db1"
+ val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf())
+ val dbPath = new Path(path)
+ // the database directory does not exist
+ assert(!fs.exists(dbPath))
+
+ sql(s"CREATE DATABASE $dbName")
+ val catalog = sqlContext.sessionState.catalog
+ val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db"
+ val db1 = catalog.getDatabaseMetadata(dbName)
+ assert(db1 == CatalogDatabase(
+ dbName,
+ "",
+ expectedDBLocation,
+ Map.empty))
+ // the database directory was created
+ assert(fs.exists(dbPath) && fs.isDirectory(dbPath))
+ sql(s"USE $dbName")
+
+ val tabName = "tab1"
+ assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
+ sql(s"CREATE TABLE $tabName as SELECT 1")
+ assert(tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
+
+ if (!tableExists) {
+ sql(s"DROP TABLE $tabName")
+ assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation)))
+ }
+
+ val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}"
+ if (tableExists && !cascade) {
+ val message = intercept[AnalysisException] {
+ sql(sqlDropDatabase)
+ }.getMessage
+ assert(message.contains(s"Database $dbName is not empty. One or more tables exist."))
+ // the database directory was not removed
+ assert(fs.exists(new Path(expectedDBLocation)))
+ } else {
+ sql(sqlDropDatabase)
+ // the database directory was removed and the inclusive table directories are also removed
+ assert(!fs.exists(new Path(expectedDBLocation)))
+ }
+ }
+ }
+ }
+
+ test("drop database containing tables - CASCADE") {
+ dropDatabase(cascade = true, tableExists = true)
+ }
+
+ test("drop an empty database - CASCADE") {
+ dropDatabase(cascade = true, tableExists = false)
+ }
+
+ test("drop database containing tables - RESTRICT") {
+ dropDatabase(cascade = false, tableExists = true)
+ }
+
+ test("drop an empty database - RESTRICT") {
+ dropDatabase(cascade = false, tableExists = false)
+ }
+
+ test("drop default database") {
+ val message = intercept[AnalysisException] {
+ sql("DROP DATABASE default")
+ }.getMessage
+ assert(message.contains("Can not drop default database"))
+ }
}