aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-07-27 17:32:34 -0700
committerYin Huai <yhuai@databricks.com>2015-07-27 17:32:34 -0700
commitce89ff477aea6def68265ed218f6105680755c9a (patch)
treeb5d17eeba9fa9ba87c15c730f9442fc661b14723 /sql
parent8ddfa52c208bf329c2b2c8909c6be04301e36083 (diff)
downloadspark-ce89ff477aea6def68265ed218f6105680755c9a.tar.gz
spark-ce89ff477aea6def68265ed218f6105680755c9a.tar.bz2
spark-ce89ff477aea6def68265ed218f6105680755c9a.zip
[SPARK-9386] [SQL] Feature flag for metastore partition pruning
Since we have been seeing a lot of failures related to this new feature, lets put it behind a flag and turn it off by default. Author: Michael Armbrust <michael@databricks.com> Closes #7703 from marmbrus/optionalMetastorePruning and squashes the following commits: 6ad128c [Michael Armbrust] style 8447835 [Michael Armbrust] [SPARK-9386][SQL] Feature flag for metastore partition pruning fd37b87 [Michael Armbrust] add config flag
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala10
3 files changed, 22 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9b2dbd7442..40eba33f59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -301,6 +301,11 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
+ val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning",
+ defaultValue = Some(false),
+ doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
+ "unmatching partitions can be eliminated earlier.")
+
val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
defaultValue = Some("_corrupt_record"),
doc = "<TODO>")
@@ -456,6 +461,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
+ private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
+
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9c707a7a2e..3180c05445 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -678,8 +678,18 @@ private[hive] case class MetastoreRelation
}
)
+ // When metastore partition pruning is turned off, we cache the list of all partitions to
+ // mimic the behavior of Spark < 1.5
+ lazy val allPartitions = table.getAllPartitions
+
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
- table.getPartitions(predicates).map { p =>
+ val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
+ table.getPartitions(predicates)
+ } else {
+ allPartitions
+ }
+
+ rawPartitions.map { p =>
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(databaseName)
tPartition.setTableName(tableName)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
index 1656587d14..d834b4e83e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
@@ -72,12 +72,10 @@ private[hive] case class HiveTable(
def isPartitioned: Boolean = partitionColumns.nonEmpty
- def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = {
- predicates match {
- case Nil => client.getAllPartitions(this)
- case _ => client.getPartitionsByFilter(this, predicates)
- }
- }
+ def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
+
+ def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
+ client.getPartitionsByFilter(this, predicates)
// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"