diff options
author | Michael Armbrust <michael@databricks.com> | 2014-04-07 00:14:00 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-04-07 00:14:00 -0700 |
commit | accd0999f9cb6a449434d3fc5274dd469eeecab2 (patch) | |
tree | 6c1a73644c3af4aee9a60872bc58d0a1a2bc531a /sql/catalyst | |
parent | 87d0928a3301835705652c24a26096546597e156 (diff) | |
download | spark-accd0999f9cb6a449434d3fc5274dd469eeecab2.tar.gz spark-accd0999f9cb6a449434d3fc5274dd469eeecab2.tar.bz2 spark-accd0999f9cb6a449434d3fc5274dd469eeecab2.zip |
[SQL] SPARK-1371 Hash Aggregation Improvements
Given:
```scala
case class Data(a: Int, b: Int)
val rdd =
sparkContext
.parallelize(1 to 200)
.flatMap(_ => (1 to 50000).map(i => Data(i % 100, i)))
rdd.registerAsTable("data")
cacheTable("data")
```
Before:
```
SELECT COUNT(*) FROM data:[10000000]
16795.567ms
SELECT a, SUM(b) FROM data GROUP BY a
7536.436ms
SELECT SUM(b) FROM data
10954.1ms
```
After:
```
SELECT COUNT(*) FROM data:[10000000]
1372.175ms
SELECT a, SUM(b) FROM data GROUP BY a
2070.446ms
SELECT SUM(b) FROM data
958.969ms
```
Author: Michael Armbrust <michael@databricks.com>
Closes #295 from marmbrus/hashAgg and squashes the following commits:
ec63575 [Michael Armbrust] Add comment.
d0495a9 [Michael Armbrust] Use scaladoc instead.
b4a6887 [Michael Armbrust] Address review comments.
a2d90ba [Michael Armbrust] Capture child output statically to avoid issues with generators and serialization.
7c13112 [Michael Armbrust] Rewrite Aggregate operator to stream input and use projections. Remove unused local RDD functions implicits.
5096f99 [Michael Armbrust] Make HiveUDAF fields transient since object inspectors are not serializable.
6a4b671 [Michael Armbrust] Add option to avoid binding operators expressions automatically.
92cca08 [Michael Armbrust] Always include serialization debug info when running tests.
1279df2 [Michael Armbrust] Increase default number of partitions.
Diffstat (limited to 'sql/catalyst')
3 files changed, 17 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index f70e80b7f2..37b9035df9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -48,11 +48,17 @@ case class BoundReference(ordinal: Int, baseReference: Attribute) override def apply(input: Row): Any = input(ordinal) } +/** + * Used to denote operators that do their own binding of attributes internally. + */ +trait NoBind { self: trees.TreeNode[_] => } + class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] { import BindReferences._ def apply(plan: TreeNode): TreeNode = { plan.transform { + case n: NoBind => n.asInstanceOf[TreeNode] case leafNode if leafNode.children.isEmpty => leafNode case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e => bindReference(e, unaryNode.children.head.output) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 38542d3fc7..5576ecbb65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -28,9 +28,9 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) { protected val exprArray = expressions.toArray def apply(input: Row): Row = { - val outputArray = new Array[Any](exprArray.size) + val outputArray = new Array[Any](exprArray.length) var i = 0 - while (i < exprArray.size) { + while (i < exprArray.length) { outputArray(i) = exprArray(i).apply(input) i += 1 } @@ -57,7 +57,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) def apply(input: Row): Row = { var i = 0 - while (i < exprArray.size) { + while (i < exprArray.length) { mutableRow(i) = exprArray(i).apply(input) i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 7303b155ca..53b884a41e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -27,7 +27,7 @@ abstract class AggregateExpression extends Expression { * Creates a new instance that can be used to compute this aggregate expression for a group * of input rows/ */ - def newInstance: AggregateFunction + def newInstance(): AggregateFunction } /** @@ -75,7 +75,7 @@ abstract class AggregateFunction override def apply(input: Row): Any // Do we really need this? - def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray) + def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray) } case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -89,7 +89,7 @@ case class Count(child: Expression) extends PartialAggregate with trees.UnaryNod SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil) } - override def newInstance = new CountFunction(child, this) + override def newInstance()= new CountFunction(child, this) } case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression { @@ -98,7 +98,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi def nullable = false def dataType = IntegerType override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})" - override def newInstance = new CountDistinctFunction(expressions, this) + override def newInstance()= new CountDistinctFunction(expressions, this) } case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -118,7 +118,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN partialCount :: partialSum :: Nil) } - override def newInstance = new AverageFunction(child, this) + override def newInstance()= new AverageFunction(child, this) } case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -134,7 +134,7 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[ partialSum :: Nil) } - override def newInstance = new SumFunction(child, this) + override def newInstance()= new SumFunction(child, this) } case class SumDistinct(child: Expression) @@ -145,7 +145,7 @@ case class SumDistinct(child: Expression) def dataType = child.dataType override def toString = s"SUM(DISTINCT $child)" - override def newInstance = new SumDistinctFunction(child, this) + override def newInstance()= new SumDistinctFunction(child, this) } case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -160,7 +160,7 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod First(partialFirst.toAttribute), partialFirst :: Nil) } - override def newInstance = new FirstFunction(child, this) + override def newInstance()= new FirstFunction(child, this) } case class AverageFunction(expr: Expression, base: AggregateExpression) |