diff options
5 files changed, 32 insertions, 4 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b215d8867d..20f5c2789a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -37,7 +37,9 @@ object MimaExcludes { // Exclude rules for 2.2.x lazy val v22excludes = v21excludes ++ Seq( // [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"), + // [SPARK-18949] [SQL] Add repairTable API to Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions") ) // Exclude rules for 2.1.x diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index a36d02e0db..30c7a3fe4f 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -258,6 +258,11 @@ class Catalog(object): """Invalidate and refresh all the cached metadata of the given table.""" self._jcatalog.refreshTable(tableName) + @since('2.1.1') + def recoverPartitions(self, tableName): + """Recover all the partitions of the given table and update the catalog.""" + self._jcatalog.recoverPartitions(tableName) + def _reset(self): """(Internal use only) Drop all existing databases (except "default"), tables, partitions and functions, and set the current database to "default". diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index aecdda1c36..6b061f8ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -301,6 +301,13 @@ abstract class Catalog { def dropGlobalTempView(viewName: String): Boolean /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @since 2.1.1 + */ + def recoverPartitions(tableName: String): Unit + + /** * Returns true if the table is currently cached in-memory. * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 6d984621cc..41ed9d7180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType @@ -394,6 +395,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @param tableName the name of the table to be repaired. + * @group ddl_ops + * @since 2.1.1 + */ + override def recoverPartitions(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(tableIdent)).toRdd + } + + /** * Returns true if the table is currently cached in-memory. * * @group cachemgmt diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index c2ac032760..3f84cbdb1b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -70,7 +70,7 @@ class PartitionProviderCompatibilitySuite } withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { verifyIsLegacyTable("test") - spark.sql("msck repair table test") + spark.catalog.recoverPartitions("test") spark.sql("show partitions test").count() // check we are a new table // sanity check table performance @@ -90,7 +90,7 @@ class PartitionProviderCompatibilitySuite setupPartitionedDatasourceTable("test", dir) spark.sql("show partitions test").count() // check we are a new table assert(spark.sql("select * from test").count() == 0) // needs repair - spark.sql("msck repair table test") + spark.catalog.recoverPartitions("test") assert(spark.sql("select * from test").count() == 5) } } @@ -160,7 +160,7 @@ class PartitionProviderCompatibilitySuite withTable("test") { withTempDir { dir => setupPartitionedDatasourceTable("test", dir) - sql("msck repair table test") + spark.catalog.recoverPartitions("test") spark.sql( """insert overwrite table test |partition (partCol=1) |