aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-08-09 10:04:36 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-09 10:04:36 -0700
commit92da22878bac07545cd946911dcb39a6bb2ee7e8 (patch)
treec725d376d3551e788235f690c8938d8bd285e397 /sql/core/src/test/scala
parent29081b587f3423bf5a3e0066357884d0c26a04bf (diff)
downloadspark-92da22878bac07545cd946911dcb39a6bb2ee7e8.tar.gz
spark-92da22878bac07545cd946911dcb39a6bb2ee7e8.tar.bz2
spark-92da22878bac07545cd946911dcb39a6bb2ee7e8.zip
[SPARK-16905] SQL DDL: MSCK REPAIR TABLE
## What changes were proposed in this pull request? MSCK REPAIR TABLE could be used to recover the partitions in external catalog based on partitions in file system. Another syntax is: ALTER TABLE table RECOVER PARTITIONS The implementation in this PR will only list partitions (not the files with a partition) in driver (in parallel if needed). ## How was this patch tested? Added unit tests for it and Hive compatibility test suite. Author: Davies Liu <davies@databricks.com> Closes #14500 from davies/repair_table.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala49
2 files changed, 57 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 044fa5fb9a..be1bccbd99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -540,6 +540,14 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}
+ test("alter table: recover partitions") {
+ val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableRecoverPartitionsCommand(
+ TableIdentifier("table_name", None))
+ comparePlans(parsed, expected)
+ }
+
test("alter view: add partition (not supported)") {
assertUnsupported(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ca9b210125..53376c56f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = true)
}
+ test("alter table: recover partitions (sequential)") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+ testRecoverPartitions()
+ }
+ }
+
+ test("alter table: recover partition (parallel)") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+ testRecoverPartitions()
+ }
+ }
+
+ private def testRecoverPartitions() {
+ val catalog = spark.sessionState.catalog
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
+ }
+
+ val tableIdent = TableIdentifier("tab1")
+ createTable(catalog, tableIdent)
+ val part1 = Map("a" -> "1", "b" -> "5")
+ createTablePartition(catalog, part1, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+ val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ // valid
+ fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+ fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+ // invalid
+ fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
+ fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
+ fs.mkdirs(new Path(root, "a=4")) // not enough columns
+ fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
+ fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
+ fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
+
+ try {
+ sql("ALTER TABLE tab1 RECOVER PARTITIONS")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2))
+ } finally {
+ fs.delete(root, true)
+ }
+ }
+
test("alter table: add partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
}