aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-08-29 15:32:26 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-29 15:32:26 -0700
commitdc4d577c6549df58f42c0e22cac354554d169896 (patch)
treeca5a282f18607538ea6dd880e472b2dcdc33c93b /sql/core
parent287c0ac7722dd4bc51b921ccc6f0e3c1625b5ff4 (diff)
downloadspark-dc4d577c6549df58f42c0e22cac354554d169896.tar.gz
spark-dc4d577c6549df58f42c0e22cac354554d169896.tar.bz2
spark-dc4d577c6549df58f42c0e22cac354554d169896.zip
[SPARK-3198] [SQL] Remove the TreeNode.id
Thus id property of the TreeNode API does save time in a faster way to compare 2 TreeNodes, it is kind of performance bottleneck during the expression object creation in a multi-threading env (because of the memory barrier). Fortunately, the tree node comparison only happen once in master, so even we remove it, the entire performance will not be affected. Author: Cheng Hao <hao.cheng@intel.com> Closes #2155 from chenghao-intel/treenode and squashes the following commits: 7cf2cd2 [Cheng Hao] Remove the implicit keyword for TreeNodeRef and some other small issues 5873415 [Cheng Hao] Remove the TreeNode.id
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala2
3 files changed, 11 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 31ad5e8aab..b3edd5020f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.trees._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.types._
@@ -141,9 +142,10 @@ case class GeneratedAggregate(
val computationSchema = computeFunctions.flatMap(_.schema)
- val resultMap: Map[Long, Expression] = aggregatesToCompute.zip(computeFunctions).map {
- case (agg, func) => agg.id -> func.result
- }.toMap
+ val resultMap: Map[TreeNodeRef, Expression] =
+ aggregatesToCompute.zip(computeFunctions).map {
+ case (agg, func) => new TreeNodeRef(agg) -> func.result
+ }.toMap
val namedGroups = groupingExpressions.zipWithIndex.map {
case (ne: NamedExpression, _) => (ne, ne)
@@ -156,7 +158,7 @@ case class GeneratedAggregate(
// The set of expressions that produce the final output given the aggregation buffer and the
// grouping expressions.
val resultExpressions = aggregateExpressions.map(_.transform {
- case e: Expression if resultMap.contains(e.id) => resultMap(e.id)
+ case e: Expression if resultMap.contains(new TreeNodeRef(e)) => resultMap(new TreeNodeRef(e))
case e: Expression if groupMap.contains(e) => groupMap(e)
})
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 5b896c55b7..8ff757bbe3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -23,6 +23,7 @@ import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{SchemaRDD, Row}
+import org.apache.spark.sql.catalyst.trees.TreeNodeRef
/**
* :: DeveloperApi ::
@@ -43,10 +44,10 @@ package object debug {
implicit class DebugQuery(query: SchemaRDD) {
def debug(): Unit = {
val plan = query.queryExecution.executedPlan
- val visited = new collection.mutable.HashSet[Long]()
+ val visited = new collection.mutable.HashSet[TreeNodeRef]()
val debugPlan = plan transform {
- case s: SparkPlan if !visited.contains(s.id) =>
- visited += s.id
+ case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
+ visited += new TreeNodeRef(s)
DebugNode(s)
}
println(s"Results returned: ${debugPlan.execute().count()}")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index aef6ebf86b..3dc8be2456 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -98,7 +98,7 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
logical.Project(
l.output,
l.transformExpressions {
- case p: PythonUDF if p.id == udf.id => evaluation.resultAttribute
+ case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute
}.withNewChildren(newChildren))
}
}