aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorMichael Allman <michael@videoamp.com>2016-12-06 11:33:35 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-06 11:33:35 +0800
commit772ddbeaa6fe5abf189d01246f57d295f9346fa3 (patch)
tree2d4255f4677f58c4e2a94a40257dd56709fa0e27 /sql/catalyst/src
parent4af142f55771affa5fc7f2abbbf5e47766194e6e (diff)
downloadspark-772ddbeaa6fe5abf189d01246f57d295f9346fa3.tar.gz
spark-772ddbeaa6fe5abf189d01246f57d295f9346fa3.tar.bz2
spark-772ddbeaa6fe5abf189d01246f57d295f9346fa3.zip
[SPARK-18572][SQL] Add a method `listPartitionNames` to `ExternalCatalog`
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572) ## What changes were proposed in this pull request? Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table. To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows: Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds: 7.901 3.983 4.018 4.331 4.261 Spark at bdc8153, `SHOW PARTITIONS table2` (Timed out after 10 minutes with a `SocketTimeoutException`.) Spark at this PR, `SHOW PARTITIONS table1`, times in seconds: 3.801 0.449 0.395 0.348 0.336 Spark at this PR, `SHOW PARTITIONS table2`, times in seconds: 5.184 1.63 1.474 1.519 1.41 Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master. This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x. ## How was this patch tested? I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions. Author: Michael Allman <michael@videoamp.com> Closes #15998 from mallman/spark-18572-list_partition_names.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala23
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala25
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala39
5 files changed, 125 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 259008f183..4b8cac8f32 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -190,14 +190,36 @@ abstract class ExternalCatalog {
spec: TablePartitionSpec): Option[CatalogTablePartition]
/**
+ * List the names of all partitions that belong to the specified table, assuming it exists.
+ *
+ * For a table with partition columns p1, p2, p3, each partition name is formatted as
+ * `p1=v1/p2=v2/p3=v3`. Each partition column name and value is an escaped path name, and can be
+ * decoded with the `ExternalCatalogUtils.unescapePathName` method.
+ *
+ * The returned sequence is sorted as strings.
+ *
+ * A partial partition spec may optionally be provided to filter the partitions returned, as
+ * described in the `listPartitions` method.
+ *
+ * @param db database name
+ * @param table table name
+ * @param partialSpec partition spec
+ */
+ def listPartitionNames(
+ db: String,
+ table: String,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[String]
+
+ /**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
+ *
* @param db database name
* @param table table name
- * @param partialSpec partition spec
+ * @param partialSpec partition spec
*/
def listPartitions(
db: String,
@@ -210,7 +232,7 @@ abstract class ExternalCatalog {
*
* @param db database name
* @param table table name
- * @param predicates partition-pruning predicates
+ * @param predicates partition-pruning predicates
*/
def listPartitionsByFilter(
db: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 880a7a0dc4..a6bebe1a39 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
@@ -488,6 +489,19 @@ class InMemoryCatalog(
}
}
+ override def listPartitionNames(
+ db: String,
+ table: String,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[String] = synchronized {
+ val partitionColumnNames = getTable(db, table).partitionColumnNames
+
+ listPartitions(db, table, partialSpec).map { partition =>
+ partitionColumnNames.map { name =>
+ escapePathName(name) + "=" + escapePathName(partition.spec(name))
+ }.mkString("/")
+ }.sorted
+ }
+
override def listPartitions(
db: String,
table: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index da3a2079f4..7a3d2097a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -749,6 +749,26 @@ class SessionCatalog(
}
/**
+ * List the names of all partitions that belong to the specified table, assuming it exists.
+ *
+ * A partial partition spec may optionally be provided to filter the partitions returned.
+ * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
+ * then a partial spec of (a='1') will return the first two only.
+ */
+ def listPartitionNames(
+ tableName: TableIdentifier,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[String] = {
+ val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
+ val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
+ partialSpec.foreach { spec =>
+ requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ }
+ externalCatalog.listPartitionNames(db, table, partialSpec)
+ }
+
+ /**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
@@ -762,6 +782,9 @@ class SessionCatalog(
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
+ partialSpec.foreach { spec =>
+ requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ }
externalCatalog.listPartitions(db, table, partialSpec)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 3b39f420af..00e663c324 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -346,6 +346,31 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(new Path(partitionLocation) == defaultPartitionLocation)
}
+ test("list partition names") {
+ val catalog = newBasicCatalog()
+ val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat)
+ catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false)
+
+ val partitionNames = catalog.listPartitionNames("db2", "tbl2")
+ assert(partitionNames == Seq("a=1/b=%25%3D", "a=1/b=2", "a=3/b=4"))
+ }
+
+ test("list partition names with partial partition spec") {
+ val catalog = newBasicCatalog()
+ val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat)
+ catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false)
+
+ val partitionNames1 = catalog.listPartitionNames("db2", "tbl2", Some(Map("a" -> "1")))
+ assert(partitionNames1 == Seq("a=1/b=%25%3D", "a=1/b=2"))
+
+ // Partial partition specs including "weird" partition values should use the unescaped values
+ val partitionNames2 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%=")))
+ assert(partitionNames2 == Seq("a=1/b=%25%3D"))
+
+ val partitionNames3 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%25%3D")))
+ assert(partitionNames3.isEmpty)
+ }
+
test("list partitions with partial partition spec") {
val catalog = newBasicCatalog()
val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index f9c4b2687b..5cc772d8e9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -878,6 +878,31 @@ class SessionCatalogSuite extends SparkFunSuite {
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
}
+ test("list partition names") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4")
+ assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) ==
+ expectedPartitionNames)
+ // List partition names without explicitly specifying database
+ catalog.setCurrentDatabase("db2")
+ assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames)
+ }
+
+ test("list partition names with partial partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(
+ catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) ==
+ Seq("a=1/b=2"))
+ }
+
+ test("list partition names with invalid partial partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+ Some(Map("unknown" -> "unknown")))
+ }
+ }
+
test("list partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
assert(catalogPartitionsEqual(
@@ -887,6 +912,20 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2))
}
+ test("list partitions with partial partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ assert(catalogPartitionsEqual(
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1))
+ }
+
+ test("list partitions with invalid partial partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ intercept[AnalysisException] {
+ catalog.listPartitions(
+ TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
+ }
+ }
+
test("list partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[NoSuchDatabaseException] {