aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAdrian Ionescu <adrian@databricks.com>2017-04-03 08:48:49 -0700
committerXiao Li <gatorsmile@gmail.com>2017-04-03 08:48:49 -0700
commit703c42c398fefd3f7f60e1c503c4df50251f8dcf (patch)
tree54616a4a243da8ab077d871a3ec0d5c0f057394b /sql/hive
parent4fa1a43af6b5a6abaef7e04cacb2617a2e92d816 (diff)
downloadspark-703c42c398fefd3f7f60e1c503c4df50251f8dcf.tar.gz
spark-703c42c398fefd3f7f60e1c503c4df50251f8dcf.tar.bz2
spark-703c42c398fefd3f7f60e1c503c4df50251f8dcf.zip
[SPARK-20194] Add support for partition pruning to in-memory catalog
## What changes were proposed in this pull request? This patch implements `listPartitionsByFilter()` for `InMemoryCatalog` and thus resolves an outstanding TODO causing the `PruneFileSourcePartitions` optimizer rule not to apply when "spark.sql.catalogImplementation" is set to "in-memory" (which is the default). The change is straightforward: it extracts the code for further filtering of the list of partitions returned by the metastore's `getPartitionsByFilter()` out from `HiveExternalCatalog` into `ExternalCatalogUtils` and calls this new function from `InMemoryCatalog` on the whole list of partitions. Now that this method is implemented we can always pass the `CatalogTable` to the `DataSource` in `FindDataSourceTable`, so that the latter is resolved to a relation with a `CatalogFileIndex`, which is what the `PruneFileSourcePartitions` rule matches for. ## How was this patch tested? Ran existing tests and added new test for `listPartitionsByFilter` in `ExternalCatalogSuite`, which is subclassed by both `InMemoryCatalogSuite` and `HiveExternalCatalogSuite`. Author: Adrian Ionescu <adrian@databricks.com> Closes #17510 from adrian-ionescu/InMemoryCatalog.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala33
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala8
3 files changed, 6 insertions, 37 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 33b21be372..f0e35dff57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -1039,37 +1039,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
val rawTable = getRawTable(db, table)
val catalogTable = restoreTableMetadata(rawTable)
- val partitionColumnNames = catalogTable.partitionColumnNames.toSet
- val nonPartitionPruningPredicates = predicates.filterNot {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
- if (nonPartitionPruningPredicates.nonEmpty) {
- sys.error("Expected only partition pruning predicates: " +
- predicates.reduceLeft(And))
- }
+ val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
- val partitionSchema = catalogTable.partitionSchema
- val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
-
- if (predicates.nonEmpty) {
- val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
+ val clientPrunedPartitions =
+ client.getPartitionsByFilter(rawTable, predicates).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}
- val boundPredicate =
- InterpretedPredicate.create(predicates.reduce(And).transform {
- case att: AttributeReference =>
- val index = partitionSchema.indexWhere(_.name == att.name)
- BoundReference(index, partitionSchema(index).dataType, nullable = true)
- })
- clientPrunedPartitions.filter { p =>
- boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId))
- }
- } else {
- client.getPartitions(catalogTable).map { part =>
- part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
- }
- }
+ prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId)
}
// --------------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index d55c41e5c9..2e35f39839 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -584,7 +584,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
*/
def convertFilters(table: Table, filters: Seq[Expression]): String = {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
- val varcharKeys = table.getPartitionKeys.asScala
+ lazy val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 4349f1aa23..bd54c043c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.types.StructType
@@ -50,13 +49,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
import utils._
- test("list partitions by filter") {
- val catalog = newBasicCatalog()
- val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT")
- assert(selectedPartitions.length == 1)
- assert(selectedPartitions.head.spec == part1.spec)
- }
-
test("SPARK-18647: do not put provider in table properties for Hive serde table") {
val catalog = newBasicCatalog()
val hiveTable = CatalogTable(