aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-08-09 10:04:36 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-09 10:04:36 -0700
commit92da22878bac07545cd946911dcb39a6bb2ee7e8 (patch)
treec725d376d3551e788235f690c8938d8bd285e397 /sql/core/src
parent29081b587f3423bf5a3e0066357884d0c26a04bf (diff)
downloadspark-92da22878bac07545cd946911dcb39a6bb2ee7e8.tar.gz
spark-92da22878bac07545cd946911dcb39a6bb2ee7e8.tar.bz2
spark-92da22878bac07545cd946911dcb39a6bb2ee7e8.zip
[SPARK-16905] SQL DDL: MSCK REPAIR TABLE
## What changes were proposed in this pull request? MSCK REPAIR TABLE could be used to recover the partitions in external catalog based on partitions in file system. Another syntax is: ALTER TABLE table RECOVER PARTITIONS The implementation in this PR will only list partitions (not the files with a partition) in driver (in parallel if needed). ## How was this patch tested? Added unit tests for it and Hive compatibility test suite. Author: Davies Liu <davies@databricks.com> Closes #14500 from davies/repair_table.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala118
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala49
5 files changed, 199 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2a452f4379..9da2b5a254 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -415,6 +415,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[AlterTableRecoverPartitionsCommand]] command.
+ *
+ * For example:
+ * {{{
+ * MSCK REPAIR TABLE tablename
+ * }}}
+ */
+ override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
+ AlterTableRecoverPartitionsCommand(
+ visitTableIdentifier(ctx.tableIdentifier),
+ "MSCK REPAIR TABLE")
+ }
+
+ /**
* Convert a table property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
*/
@@ -785,6 +799,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create an [[AlterTableDiscoverPartitionsCommand]] command
+ *
+ * For example:
+ * {{{
+ * ALTER TABLE table RECOVER PARTITIONS;
+ * }}}
+ */
+ override def visitRecoverPartitions(
+ ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
+ AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
+ }
+
+ /**
* Create an [[AlterTableSetLocationCommand]] command
*
* For example:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f0e49e65c4..8fa7615b97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,18 +17,23 @@
package org.apache.spark.sql.execution.command
+import scala.collection.GenSeq
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
-
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
}
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table RECOVER PARTITIONS;
+ * MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+ tableName: TableIdentifier,
+ cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
+ override def run(spark: SparkSession): Seq[Row] = {
+ val catalog = spark.sessionState.catalog
+ if (!catalog.tableExists(tableName)) {
+ throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
+ }
+ val table = catalog.getTableMetadata(tableName)
+ if (catalog.isTemporaryTable(tableName)) {
+ throw new AnalysisException(
+ s"Operation not allowed: $cmd on temporary tables: $tableName")
+ }
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ s"Operation not allowed: $cmd on datasource tables: $tableName")
+ }
+ if (table.tableType != CatalogTableType.EXTERNAL) {
+ throw new AnalysisException(
+ s"Operation not allowed: $cmd only works on external tables: $tableName")
+ }
+ if (!DDLUtils.isTablePartitioned(table)) {
+ throw new AnalysisException(
+ s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
+ }
+ if (table.storage.locationUri.isEmpty) {
+ throw new AnalysisException(
+ s"Operation not allowed: $cmd only works on table with location provided: $tableName")
+ }
+
+ val root = new Path(table.storage.locationUri.get)
+ val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
+ val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ val partitionSpecsAndLocs = scanPartitions(
+ spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
+ val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+ // inherit table storage format (possibly except for location)
+ CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
+ }
+ spark.sessionState.catalog.createPartitions(tableName,
+ parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+ Seq.empty[Row]
+ }
+
+ @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+
+ private def scanPartitions(
+ spark: SparkSession,
+ fs: FileSystem,
+ filter: PathFilter,
+ path: Path,
+ spec: TablePartitionSpec,
+ partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+ if (partitionNames.length == 0) {
+ return Seq(spec -> path)
+ }
+
+ val statuses = fs.listStatus(path)
+ val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
+ val statusPar: GenSeq[FileStatus] =
+ if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
+ val parArray = statuses.par
+ parArray.tasksupport = evalTaskSupport
+ parArray
+ } else {
+ statuses
+ }
+ statusPar.flatMap { st =>
+ val name = st.getPath.getName
+ if (st.isDirectory && name.contains("=")) {
+ val ps = name.split("=", 2)
+ val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+ // TODO: Validate the value
+ val value = PartitioningUtils.unescapePathName(ps(1))
+ // comparing with case-insensitive, but preserve the case
+ if (columnName == partitionNames(0)) {
+ scanPartitions(
+ spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
+ } else {
+ logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
+ Seq()
+ }
+ } else {
+ if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
+ logWarning(s"ignore ${new Path(path, name)}")
+ }
+ Seq()
+ }
+ }
+ }
+}
+
/**
* A command that sets the location of a table or a partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e6fe9a73a1..3b1052619b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 044fa5fb9a..be1bccbd99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -540,6 +540,14 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}
+ test("alter table: recover partitions") {
+ val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
+ val parsed = parser.parsePlan(sql)
+ val expected = AlterTableRecoverPartitionsCommand(
+ TableIdentifier("table_name", None))
+ comparePlans(parsed, expected)
+ }
+
test("alter view: add partition (not supported)") {
assertUnsupported(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ca9b210125..53376c56f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = true)
}
+ test("alter table: recover partitions (sequential)") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+ testRecoverPartitions()
+ }
+ }
+
+ test("alter table: recover partition (parallel)") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+ testRecoverPartitions()
+ }
+ }
+
+ private def testRecoverPartitions() {
+ val catalog = spark.sessionState.catalog
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
+ }
+
+ val tableIdent = TableIdentifier("tab1")
+ createTable(catalog, tableIdent)
+ val part1 = Map("a" -> "1", "b" -> "5")
+ createTablePartition(catalog, part1, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+ val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ // valid
+ fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+ fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+ // invalid
+ fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
+ fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
+ fs.mkdirs(new Path(root, "a=4")) // not enough columns
+ fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
+ fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
+ fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
+
+ try {
+ sql("ALTER TABLE tab1 RECOVER PARTITIONS")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2))
+ } finally {
+ fs.delete(root, true)
+ }
+ }
+
test("alter table: add partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
}