aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--python/pyspark/sql/catalog.py5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala6
5 files changed, 32 insertions, 4 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b215d8867d..20f5c2789a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,7 +37,9 @@ object MimaExcludes {
// Exclude rules for 2.2.x
lazy val v22excludes = v21excludes ++ Seq(
// [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"),
+ // [SPARK-18949] [SQL] Add repairTable API to Catalog
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions")
)
// Exclude rules for 2.1.x
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index a36d02e0db..30c7a3fe4f 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -258,6 +258,11 @@ class Catalog(object):
"""Invalidate and refresh all the cached metadata of the given table."""
self._jcatalog.refreshTable(tableName)
+ @since('2.1.1')
+ def recoverPartitions(self, tableName):
+ """Recover all the partitions of the given table and update the catalog."""
+ self._jcatalog.recoverPartitions(tableName)
+
def _reset(self):
"""(Internal use only) Drop all existing databases (except "default"), tables,
partitions and functions, and set the current database to "default".
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index aecdda1c36..6b061f8ab2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -301,6 +301,13 @@ abstract class Catalog {
def dropGlobalTempView(viewName: String): Boolean
/**
+ * Recover all the partitions in the directory of a table and update the catalog.
+ *
+ * @since 2.1.1
+ */
+ def recoverPartitions(tableName: String): Unit
+
+ /**
* Returns true if the table is currently cached in-memory.
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6d984621cc..41ed9d7180 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.types.StructType
@@ -394,6 +395,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
/**
+ * Recover all the partitions in the directory of a table and update the catalog.
+ *
+ * @param tableName the name of the table to be repaired.
+ * @group ddl_ops
+ * @since 2.1.1
+ */
+ override def recoverPartitions(tableName: String): Unit = {
+ val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ sparkSession.sessionState.executePlan(
+ AlterTableRecoverPartitionsCommand(tableIdent)).toRdd
+ }
+
+ /**
* Returns true if the table is currently cached in-memory.
*
* @group cachemgmt
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index c2ac032760..3f84cbdb1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -70,7 +70,7 @@ class PartitionProviderCompatibilitySuite
}
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
verifyIsLegacyTable("test")
- spark.sql("msck repair table test")
+ spark.catalog.recoverPartitions("test")
spark.sql("show partitions test").count() // check we are a new table
// sanity check table performance
@@ -90,7 +90,7 @@ class PartitionProviderCompatibilitySuite
setupPartitionedDatasourceTable("test", dir)
spark.sql("show partitions test").count() // check we are a new table
assert(spark.sql("select * from test").count() == 0) // needs repair
- spark.sql("msck repair table test")
+ spark.catalog.recoverPartitions("test")
assert(spark.sql("select * from test").count() == 5)
}
}
@@ -160,7 +160,7 @@ class PartitionProviderCompatibilitySuite
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
- sql("msck repair table test")
+ spark.catalog.recoverPartitions("test")
spark.sql(
"""insert overwrite table test
|partition (partCol=1)