aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-04-10 13:36:08 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-10 13:36:08 +0800
commit3d7f201f2adc2d33be6f564fa76435c18552f4ba (patch)
tree2c34606cf5cf36da43cf4d9b7056bf2b0c33cd44 /sql/hive
parent1a0bc41659eef317dcac18df35c26857216a4314 (diff)
downloadspark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.tar.gz
spark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.tar.bz2
spark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.zip
[SPARK-20229][SQL] add semanticHash to QueryPlan
## What changes were proposed in this pull request? Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`. follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans. ## How was this patch tested? existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good. Author: Wenchen Fan <wenchen@databricks.com> Closes #17541 from cloud-fan/plan-semantic.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala45
1 files changed, 21 insertions, 24 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 28f074849c..fab0d7fa84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -72,7 +72,7 @@ case class HiveTableScanExec(
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
- private val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
+ private lazy val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
@@ -80,20 +80,22 @@ case class HiveTableScanExec(
BindReferences.bindReference(pred, relation.partitionCols)
}
- // Create a local copy of hadoopConf,so that scan specific modifications should not impact
- // other queries
- @transient private val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
- @transient private val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta)
- @transient private val tableDesc = new TableDesc(
+ @transient private lazy val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta)
+ @transient private lazy val tableDesc = new TableDesc(
hiveQlTable.getInputFormatClass,
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata)
- // append columns ids and names before broadcast
- addColumnMetadataToConf(hadoopConf)
+ // Create a local copy of hadoopConf,so that scan specific modifications should not impact
+ // other queries
+ @transient private lazy val hadoopConf = {
+ val c = sparkSession.sessionState.newHadoopConf()
+ // append columns ids and names before broadcast
+ addColumnMetadataToConf(c)
+ c
+ }
- @transient private val hadoopReader = new HadoopTableReader(
+ @transient private lazy val hadoopReader = new HadoopTableReader(
output,
relation.partitionCols,
tableDesc,
@@ -104,7 +106,7 @@ case class HiveTableScanExec(
Cast(Literal(value), dataType).eval(null)
}
- private def addColumnMetadataToConf(hiveConf: Configuration) {
+ private def addColumnMetadataToConf(hiveConf: Configuration): Unit = {
// Specifies needed column IDs for those non-partitioning columns.
val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer)
@@ -198,18 +200,13 @@ case class HiveTableScanExec(
}
}
- override def sameResult(plan: SparkPlan): Boolean = plan match {
- case other: HiveTableScanExec =>
- val thisPredicates = partitionPruningPred.map(cleanExpression)
- val otherPredicates = other.partitionPruningPred.map(cleanExpression)
-
- val result = relation.sameResult(other.relation) &&
- output.length == other.output.length &&
- output.zip(other.output)
- .forall(p => p._1.name == p._2.name && p._1.dataType == p._2.dataType) &&
- thisPredicates.length == otherPredicates.length &&
- thisPredicates.zip(otherPredicates).forall(p => p._1.semanticEquals(p._2))
- result
- case _ => false
+ override lazy val canonicalized: HiveTableScanExec = {
+ val input: AttributeSeq = relation.output
+ HiveTableScanExec(
+ requestedAttributes.map(normalizeExprId(_, input)),
+ relation.canonicalized.asInstanceOf[CatalogRelation],
+ partitionPruningPred.map(normalizeExprId(_, input)))(sparkSession)
}
+
+ override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession)
}