diff options
author | Davies Liu <davies@databricks.com> | 2016-08-29 11:23:53 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-08-29 11:23:53 -0700 |
commit | 48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd (patch) | |
tree | 2da91b214f23f1f559316e32e971dd6c03f70c47 /sql/hive | |
parent | 6a0fda2c0590b455e8713da79cd5f2413e5d0f28 (diff) | |
download | spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.tar.gz spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.tar.bz2 spark-48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd.zip |
[SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastore
## What changes were proposed in this pull request?
This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions).
It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default).
## How was this patch tested?
Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster).
Author: Davies Liu <davies@databricks.com>
Closes #14607 from davies/repair_batch.
Diffstat (limited to 'sql/hive')
3 files changed, 47 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 81d5a124e9..b45ad30dca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -829,6 +829,8 @@ private[hive] class HiveClientImpl( serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) - .map(_.asScala.toMap).orNull)) + .map(_.asScala.toMap).orNull), + parameters = + if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 41527fcd05..3238770761 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { val table = hive.getTable(database, tableName) parts.foreach { s => val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull + val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { // Ignore this partition since it already exists and ignoreIfExists == true @@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { table, spec, location, - null, // partParams + params, // partParams null, // inputFormat null, // outputFormat -1: JInteger, // numBuckets @@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) - parts.foreach { s => + parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + if (s.parameters.nonEmpty) { + addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) + } } hive.createPartitions(addPartitionDesc) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f00a99b6d0..9019333d76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -378,6 +378,44 @@ class HiveDDLSuite expectedSerdeProps) } + test("MSCK REPAIR RABLE") { + val catalog = spark.sessionState.catalog + val tableIdent = TableIdentifier("tab1") + sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)") + val part1 = Map("a" -> "1", "b" -> "5") + val part2 = Map("a" -> "2", "b" -> "6") + val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get) + val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + // valid + fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) + fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file + fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file + fs.mkdirs(new Path(new Path(root, "A=2"), "B=6")) + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file + fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary")) + + // invalid + fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name + fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order + fs.mkdirs(new Path(root, "a=4")) // not enough columns + fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file + fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS + fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary + fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with . + + try { + sql("MSCK REPAIR TABLE tab1") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2)) + assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1") + assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2") + } finally { + fs.delete(root, true) + } + } + test("drop table using drop view") { withTable("tab1") { sql("CREATE TABLE tab1(c1 int)") |