diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-12-20 23:40:02 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-12-20 23:40:02 -0800 |
commit | 24c0c94128770be9034fb69518713d7f6aa1e041 (patch) | |
tree | f367c32b5005da96c0634ba5e1f8337e5d0aa86e /sql/core | |
parent | b2dd8ec6b2c05c996e2d7c0bf8db0073c1ee0b94 (diff) | |
download | spark-24c0c94128770be9034fb69518713d7f6aa1e041.tar.gz spark-24c0c94128770be9034fb69518713d7f6aa1e041.tar.bz2 spark-24c0c94128770be9034fb69518713d7f6aa1e041.zip |
[SPARK-18949][SQL] Add recoverPartitions API to Catalog
### What changes were proposed in this pull request?
Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means)
After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table.
Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```
### How was this patch tested?
Modified the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #16356 from gatorsmile/repairTable.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala | 7 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala | 14 |
2 files changed, 21 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index aecdda1c36..6b061f8ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -301,6 +301,13 @@ abstract class Catalog { def dropGlobalTempView(viewName: String): Boolean /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @since 2.1.1 + */ + def recoverPartitions(tableName: String): Unit + + /** * Returns true if the table is currently cached in-memory. * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 6d984621cc..41ed9d7180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType @@ -394,6 +395,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @param tableName the name of the table to be repaired. + * @group ddl_ops + * @since 2.1.1 + */ + override def recoverPartitions(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(tableIdent)).toRdd + } + + /** * Returns true if the table is currently cached in-memory. * * @group cachemgmt |