aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-04-07 00:14:00 -0700
committerReynold Xin <rxin@apache.org>2014-04-07 00:14:00 -0700
commitaccd0999f9cb6a449434d3fc5274dd469eeecab2 (patch)
tree6c1a73644c3af4aee9a60872bc58d0a1a2bc531a /sql/catalyst
parent87d0928a3301835705652c24a26096546597e156 (diff)
downloadspark-accd0999f9cb6a449434d3fc5274dd469eeecab2.tar.gz
spark-accd0999f9cb6a449434d3fc5274dd469eeecab2.tar.bz2
spark-accd0999f9cb6a449434d3fc5274dd469eeecab2.zip
[SQL] SPARK-1371 Hash Aggregation Improvements
Given: ```scala case class Data(a: Int, b: Int) val rdd = sparkContext .parallelize(1 to 200) .flatMap(_ => (1 to 50000).map(i => Data(i % 100, i))) rdd.registerAsTable("data") cacheTable("data") ``` Before: ``` SELECT COUNT(*) FROM data:[10000000] 16795.567ms SELECT a, SUM(b) FROM data GROUP BY a 7536.436ms SELECT SUM(b) FROM data 10954.1ms ``` After: ``` SELECT COUNT(*) FROM data:[10000000] 1372.175ms SELECT a, SUM(b) FROM data GROUP BY a 2070.446ms SELECT SUM(b) FROM data 958.969ms ``` Author: Michael Armbrust <michael@databricks.com> Closes #295 from marmbrus/hashAgg and squashes the following commits: ec63575 [Michael Armbrust] Add comment. d0495a9 [Michael Armbrust] Use scaladoc instead. b4a6887 [Michael Armbrust] Address review comments. a2d90ba [Michael Armbrust] Capture child output statically to avoid issues with generators and serialization. 7c13112 [Michael Armbrust] Rewrite Aggregate operator to stream input and use projections. Remove unused local RDD functions implicits. 5096f99 [Michael Armbrust] Make HiveUDAF fields transient since object inspectors are not serializable. 6a4b671 [Michael Armbrust] Add option to avoid binding operators expressions automatically. 92cca08 [Michael Armbrust] Always include serialization debug info when running tests. 1279df2 [Michael Armbrust] Increase default number of partitions.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala16
3 files changed, 17 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index f70e80b7f2..37b9035df9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -48,11 +48,17 @@ case class BoundReference(ordinal: Int, baseReference: Attribute)
override def apply(input: Row): Any = input(ordinal)
}
+/**
+ * Used to denote operators that do their own binding of attributes internally.
+ */
+trait NoBind { self: trees.TreeNode[_] => }
+
class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
import BindReferences._
def apply(plan: TreeNode): TreeNode = {
plan.transform {
+ case n: NoBind => n.asInstanceOf[TreeNode]
case leafNode if leafNode.children.isEmpty => leafNode
case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
bindReference(e, unaryNode.children.head.output)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 38542d3fc7..5576ecbb65 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -28,9 +28,9 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
protected val exprArray = expressions.toArray
def apply(input: Row): Row = {
- val outputArray = new Array[Any](exprArray.size)
+ val outputArray = new Array[Any](exprArray.length)
var i = 0
- while (i < exprArray.size) {
+ while (i < exprArray.length) {
outputArray(i) = exprArray(i).apply(input)
i += 1
}
@@ -57,7 +57,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row)
def apply(input: Row): Row = {
var i = 0
- while (i < exprArray.size) {
+ while (i < exprArray.length) {
mutableRow(i) = exprArray(i).apply(input)
i += 1
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 7303b155ca..53b884a41e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -27,7 +27,7 @@ abstract class AggregateExpression extends Expression {
* Creates a new instance that can be used to compute this aggregate expression for a group
* of input rows/
*/
- def newInstance: AggregateFunction
+ def newInstance(): AggregateFunction
}
/**
@@ -75,7 +75,7 @@ abstract class AggregateFunction
override def apply(input: Row): Any
// Do we really need this?
- def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
+ def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
}
case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -89,7 +89,7 @@ case class Count(child: Expression) extends PartialAggregate with trees.UnaryNod
SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil)
}
- override def newInstance = new CountFunction(child, this)
+ override def newInstance()= new CountFunction(child, this)
}
case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression {
@@ -98,7 +98,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi
def nullable = false
def dataType = IntegerType
override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})"
- override def newInstance = new CountDistinctFunction(expressions, this)
+ override def newInstance()= new CountDistinctFunction(expressions, this)
}
case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -118,7 +118,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN
partialCount :: partialSum :: Nil)
}
- override def newInstance = new AverageFunction(child, this)
+ override def newInstance()= new AverageFunction(child, this)
}
case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -134,7 +134,7 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[
partialSum :: Nil)
}
- override def newInstance = new SumFunction(child, this)
+ override def newInstance()= new SumFunction(child, this)
}
case class SumDistinct(child: Expression)
@@ -145,7 +145,7 @@ case class SumDistinct(child: Expression)
def dataType = child.dataType
override def toString = s"SUM(DISTINCT $child)"
- override def newInstance = new SumDistinctFunction(child, this)
+ override def newInstance()= new SumDistinctFunction(child, this)
}
case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -160,7 +160,7 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
First(partialFirst.toAttribute),
partialFirst :: Nil)
}
- override def newInstance = new FirstFunction(child, this)
+ override def newInstance()= new FirstFunction(child, this)
}
case class AverageFunction(expr: Expression, base: AggregateExpression)