diff options
Diffstat (limited to 'sql/catalyst/src')
5 files changed, 125 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 259008f183..4b8cac8f32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -190,14 +190,36 @@ abstract class ExternalCatalog { spec: TablePartitionSpec): Option[CatalogTablePartition] /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * For a table with partition columns p1, p2, p3, each partition name is formatted as + * `p1=v1/p2=v2/p3=v3`. Each partition column name and value is an escaped path name, and can be + * decoded with the `ExternalCatalogUtils.unescapePathName` method. + * + * The returned sequence is sorted as strings. + * + * A partial partition spec may optionally be provided to filter the partitions returned, as + * described in the `listPartitions` method. + * + * @param db database name + * @param table table name + * @param partialSpec partition spec + */ + def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] + + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * * A partial partition spec may optionally be provided to filter the partitions returned. * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), * then a partial spec of (a='1') will return the first two only. + * * @param db database name * @param table table name - * @param partialSpec partition spec + * @param partialSpec partition spec */ def listPartitions( db: String, @@ -210,7 +232,7 @@ abstract class ExternalCatalog { * * @param db database name * @param table table name - * @param predicates partition-pruning predicates + * @param predicates partition-pruning predicates */ def listPartitionsByFilter( db: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 880a7a0dc4..a6bebe1a39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils @@ -488,6 +489,19 @@ class InMemoryCatalog( } } + override def listPartitionNames( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = synchronized { + val partitionColumnNames = getTable(db, table).partitionColumnNames + + listPartitions(db, table, partialSpec).map { partition => + partitionColumnNames.map { name => + escapePathName(name) + "=" + escapePathName(partition.spec(name)) + }.mkString("/") + }.sorted + } + override def listPartitions( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index da3a2079f4..7a3d2097a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -749,6 +749,26 @@ class SessionCatalog( } /** + * List the names of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. + */ + def listPartitionNames( + tableName: TableIdentifier, + partialSpec: Option[TablePartitionSpec] = None): Seq[String] = { + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) + partialSpec.foreach { spec => + requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + } + externalCatalog.listPartitionNames(db, table, partialSpec) + } + + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * * A partial partition spec may optionally be provided to filter the partitions returned. @@ -762,6 +782,9 @@ class SessionCatalog( val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) + partialSpec.foreach { spec => + requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + } externalCatalog.listPartitions(db, table, partialSpec) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 3b39f420af..00e663c324 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -346,6 +346,31 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(new Path(partitionLocation) == defaultPartitionLocation) } + test("list partition names") { + val catalog = newBasicCatalog() + val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) + catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false) + + val partitionNames = catalog.listPartitionNames("db2", "tbl2") + assert(partitionNames == Seq("a=1/b=%25%3D", "a=1/b=2", "a=3/b=4")) + } + + test("list partition names with partial partition spec") { + val catalog = newBasicCatalog() + val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) + catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false) + + val partitionNames1 = catalog.listPartitionNames("db2", "tbl2", Some(Map("a" -> "1"))) + assert(partitionNames1 == Seq("a=1/b=%25%3D", "a=1/b=2")) + + // Partial partition specs including "weird" partition values should use the unescaped values + val partitionNames2 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%="))) + assert(partitionNames2 == Seq("a=1/b=%25%3D")) + + val partitionNames3 = catalog.listPartitionNames("db2", "tbl2", Some(Map("b" -> "%25%3D"))) + assert(partitionNames3.isEmpty) + } + test("list partitions with partial partition spec") { val catalog = newBasicCatalog() val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f9c4b2687b..5cc772d8e9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -878,6 +878,31 @@ class SessionCatalogSuite extends SparkFunSuite { "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) } + test("list partition names") { + val catalog = new SessionCatalog(newBasicCatalog()) + val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4") + assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) == + expectedPartitionNames) + // List partition names without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames) + } + + test("list partition names with partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert( + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) == + Seq("a=1/b=2")) + } + + test("list partition names with invalid partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(Map("unknown" -> "unknown"))) + } + } + test("list partitions") { val catalog = new SessionCatalog(newBasicCatalog()) assert(catalogPartitionsEqual( @@ -887,6 +912,20 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2)) } + test("list partitions with partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalogPartitionsEqual( + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1)) + } + + test("list partitions with invalid partial partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.listPartitions( + TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown"))) + } + } + test("list partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[NoSuchDatabaseException] { |