aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-08-29 11:23:53 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-29 11:23:53 -0700
commit48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd (patch)
tree2da91b214f23f1f559316e32e971dd6c03f70c47 /sql/hive/src
parent6a0fda2c0590b455e8713da79cd5f2413e5d0f28 (diff)
downloadspark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.tar.gz
spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.tar.bz2
spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.zip
[SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastore
## What changes were proposed in this pull request? This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions). It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default). ## How was this patch tested? Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster). Author: Davies Liu <davies@databricks.com> Closes #14607 from davies/repair_batch.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala38
3 files changed, 47 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 81d5a124e9..b45ad30dca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -829,6 +829,8 @@ private[hive] class HiveClientImpl(
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
compressed = apiPartition.getSd.isCompressed,
properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
- .map(_.asScala.toMap).orNull))
+ .map(_.asScala.toMap).orNull),
+ parameters =
+ if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 41527fcd05..3238770761 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
+ val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
// Ignore this partition since it already exists and ignoreIfExists == true
@@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
table,
spec,
location,
- null, // partParams
+ params, // partParams
null, // inputFormat
null, // outputFormat
-1: JInteger, // numBuckets
@@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
- parts.foreach { s =>
+ parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+ if (s.parameters.nonEmpty) {
+ addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
+ }
}
hive.createPartitions(addPartitionDesc)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f00a99b6d0..9019333d76 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -378,6 +378,44 @@ class HiveDDLSuite
expectedSerdeProps)
}
+ test("MSCK REPAIR RABLE") {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1")
+ sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+ val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+ // valid
+ fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
+ fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
+ fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
+ // invalid
+ fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
+ fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
+ fs.mkdirs(new Path(root, "a=4")) // not enough columns
+ fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
+ fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
+ fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
+
+ try {
+ sql("MSCK REPAIR TABLE tab1")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2))
+ assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+ } finally {
+ fs.delete(root, true)
+ }
+ }
+
test("drop table using drop view") {
withTable("tab1") {
sql("CREATE TABLE tab1(c1 int)")