aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-08-29 11:23:53 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-29 11:23:53 -0700
commit48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd (patch)
tree2da91b214f23f1f559316e32e971dd6c03f70c47 /sql/core/src/test/scala
parent6a0fda2c0590b455e8713da79cd5f2413e5d0f28 (diff)
downloadspark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.tar.gz
spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.tar.bz2
spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.zip
[SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastore
## What changes were proposed in this pull request? This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions). It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default). ## How was this patch tested? Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster). Author: Davies Liu <davies@databricks.com> Closes #14607 from davies/repair_batch.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala13
1 files changed, 11 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b343454b12..0073659a31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -824,13 +824,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("alter table: recover partitions (sequential)") {
- withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
}
}
test("alter table: recover partition (parallel)") {
- withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+ withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
testRecoverPartitions()
}
}
@@ -853,7 +853,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
+ fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
+ fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
@@ -867,6 +874,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
+ assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+ assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
} finally {
fs.delete(root, true)
}