aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala156
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala38
7 files changed, 200 insertions, 33 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 83e01f95c0..8408d765d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -81,10 +81,12 @@ object CatalogStorageFormat {
*
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
+ * @param parameters some parameters for the partition, for example, stats.
*/
case class CatalogTablePartition(
spec: CatalogTypes.TablePartitionSpec,
- storage: CatalogStorageFormat)
+ storage: CatalogStorageFormat,
+ parameters: Map[String, String] = Map.empty)
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 3817f919f3..53fb684eb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,12 +17,13 @@
package org.apache.spark.sql.execution.command
-import scala.collection.GenSeq
+import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -422,6 +424,9 @@ case class AlterTableDropPartitionCommand(
}
+
+case class PartitionStatistics(numFiles: Int, totalSize: Long)
+
/**
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
* update the catalog.
@@ -435,6 +440,31 @@ case class AlterTableDropPartitionCommand(
case class AlterTableRecoverPartitionsCommand(
tableName: TableIdentifier,
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
+
+ // These are list of statistics that can be collected quickly without requiring a scan of the data
+ // see https://github.com/apache/hive/blob/master/
+ // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
+ val NUM_FILES = "numFiles"
+ val TOTAL_SIZE = "totalSize"
+ val DDL_TIME = "transient_lastDdlTime"
+
+ private def getPathFilter(hadoopConf: Configuration): PathFilter = {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
+ val jobConf = new JobConf(hadoopConf, this.getClass)
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ new PathFilter {
+ override def accept(path: Path): Boolean = {
+ val name = path.getName
+ if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
+ pathFilter == null || pathFilter.accept(path)
+ } else {
+ false
+ }
+ }
+ }
+ }
+
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
@@ -449,10 +479,6 @@ case class AlterTableRecoverPartitionsCommand(
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
}
- if (table.tableType != CatalogTableType.EXTERNAL) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd only works on external tables: $tableName")
- }
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
@@ -463,19 +489,26 @@ case class AlterTableRecoverPartitionsCommand(
}
val root = new Path(table.storage.locationUri.get)
+ logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
- // Dummy jobconf to get to the pathFilter defined in configuration
- // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
- val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
+ val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ val pathFilter = getPathFilter(hadoopConf)
val partitionSpecsAndLocs = scanPartitions(
- spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
- val parts = partitionSpecsAndLocs.map { case (spec, location) =>
- // inherit table storage format (possibly except for location)
- CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
+ spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
+ val total = partitionSpecsAndLocs.length
+ logInfo(s"Found $total partitions in $root")
+
+ val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
+ gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
+ } else {
+ GenMap.empty[String, PartitionStatistics]
}
- spark.sessionState.catalog.createPartitions(tableName,
- parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+ logInfo(s"Finished to gather the fast stats for all $total partitions.")
+
+ addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+ logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
}
@@ -487,15 +520,16 @@ case class AlterTableRecoverPartitionsCommand(
filter: PathFilter,
path: Path,
spec: TablePartitionSpec,
- partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
- if (partitionNames.length == 0) {
+ partitionNames: Seq[String],
+ threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+ if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
- val statuses = fs.listStatus(path)
- val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
+ val statuses = fs.listStatus(path, filter)
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
+ // parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
parArray.tasksupport = evalTaskSupport
parArray
@@ -510,21 +544,89 @@ case class AlterTableRecoverPartitionsCommand(
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
// comparing with case-insensitive, but preserve the case
- if (columnName == partitionNames(0)) {
- scanPartitions(
- spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
+ if (columnName == partitionNames.head) {
+ scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
+ partitionNames.drop(1), threshold)
} else {
- logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
+ logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
Seq()
}
} else {
- if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
- logWarning(s"ignore ${new Path(path, name)}")
- }
+ logWarning(s"ignore ${new Path(path, name)}")
Seq()
}
}
}
+
+ private def gatherPartitionStats(
+ spark: SparkSession,
+ partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
+ fs: FileSystem,
+ pathFilter: PathFilter,
+ threshold: Int): GenMap[String, PartitionStatistics] = {
+ if (partitionSpecsAndLocs.length > threshold) {
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
+
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(serializedPaths.length,
+ Math.min(spark.sparkContext.defaultParallelism, 10000))
+ // gather the fast stats for all the partitions otherwise Hive metastore will list all the
+ // files for all the new partitions in sequential way, which is super slow.
+ logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
+ spark.sparkContext.parallelize(serializedPaths, numParallelism)
+ .mapPartitions { paths =>
+ val pathFilter = getPathFilter(serializableConfiguration.value)
+ paths.map(new Path(_)).map{ path =>
+ val fs = path.getFileSystem(serializableConfiguration.value)
+ val statuses = fs.listStatus(path, pathFilter)
+ (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
+ }
+ }.collectAsMap()
+ } else {
+ partitionSpecsAndLocs.map { case (_, location) =>
+ val statuses = fs.listStatus(location, pathFilter)
+ (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
+ }.toMap
+ }
+ }
+
+ private def addPartitions(
+ spark: SparkSession,
+ table: CatalogTable,
+ partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
+ partitionStats: GenMap[String, PartitionStatistics]): Unit = {
+ val total = partitionSpecsAndLocs.length
+ var done = 0L
+ // Hive metastore may not have enough memory to handle millions of partitions in single RPC,
+ // we should split them into smaller batches. Since Hive client is not thread safe, we cannot
+ // do this in parallel.
+ val batchSize = 100
+ partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
+ val now = System.currentTimeMillis() / 1000
+ val parts = batch.map { case (spec, location) =>
+ val params = partitionStats.get(location.toString).map {
+ case PartitionStatistics(numFiles, totalSize) =>
+ // This two fast stat could prevent Hive metastore to list the files again.
+ Map(NUM_FILES -> numFiles.toString,
+ TOTAL_SIZE -> totalSize.toString,
+ // Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
+ // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+ DDL_TIME -> now.toString)
+ }.getOrElse(Map.empty)
+ // inherit table storage format (possibly except for location)
+ CatalogTablePartition(
+ spec,
+ table.storage.copy(locationUri = Some(location.toUri.toString)),
+ params)
+ }
+ spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
+ done += parts.length
+ logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f2b1afd71a..91988270ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -310,6 +310,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
+ .internal()
+ .doc("When true, fast stats (number of files and total size of all files) will be gathered" +
+ " in parallel while repairing table partitions to avoid the sequential listing in Hive" +
+ " metastore.")
+ .booleanConf
+ .createWithDefault(true)
+
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -608,6 +616,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
+ def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
+
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b343454b12..0073659a31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -824,13 +824,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("alter table: recover partitions (sequential)") {
- withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
}
}
test("alter table: recover partition (parallel)") {
- withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
testRecoverPartitions()
}
}
@@ -853,7 +853,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
+ fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
@@ -867,6 +874,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
+ assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
} finally {
fs.delete(root, true)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 81d5a124e9..b45ad30dca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -829,6 +829,8 @@ private[hive] class HiveClientImpl(
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
compressed = apiPartition.getSd.isCompressed,
properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
- .map(_.asScala.toMap).orNull))
+ .map(_.asScala.toMap).orNull),
+ parameters =
+ if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 41527fcd05..3238770761 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
+ val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
// Ignore this partition since it already exists and ignoreIfExists == true
@@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
table,
spec,
location,
- null, // partParams
+ params, // partParams
null, // inputFormat
null, // outputFormat
-1: JInteger, // numBuckets
@@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
- parts.foreach { s =>
+ parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+ if (s.parameters.nonEmpty) {
+ addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
+ }
}
hive.createPartitions(addPartitionDesc)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f00a99b6d0..9019333d76 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -378,6 +378,44 @@ class HiveDDLSuite
expectedSerdeProps)
}
+ test("MSCK REPAIR RABLE") {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1")
+ sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+ val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ // valid
+ fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
+ fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
+ fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
+ // invalid
+ fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
+ fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
+ fs.mkdirs(new Path(root, "a=4")) // not enough columns
+ fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
+ fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
+ fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
+
+ try {
+ sql("MSCK REPAIR TABLE tab1")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2))
+ assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ } finally {
+ fs.delete(root, true)
+ }
+ }
+
test("drop table using drop view") {
withTable("tab1") {
sql("CREATE TABLE tab1(c1 int)")