From accd0999f9cb6a449434d3fc5274dd469eeecab2 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 7 Apr 2014 00:14:00 -0700 Subject: [SQL] SPARK-1371 Hash Aggregation Improvements Given: ```scala case class Data(a: Int, b: Int) val rdd = sparkContext .parallelize(1 to 200) .flatMap(_ => (1 to 50000).map(i => Data(i % 100, i))) rdd.registerAsTable("data") cacheTable("data") ``` Before: ``` SELECT COUNT(*) FROM data:[10000000] 16795.567ms SELECT a, SUM(b) FROM data GROUP BY a 7536.436ms SELECT SUM(b) FROM data 10954.1ms ``` After: ``` SELECT COUNT(*) FROM data:[10000000] 1372.175ms SELECT a, SUM(b) FROM data GROUP BY a 2070.446ms SELECT SUM(b) FROM data 958.969ms ``` Author: Michael Armbrust Closes #295 from marmbrus/hashAgg and squashes the following commits: ec63575 [Michael Armbrust] Add comment. d0495a9 [Michael Armbrust] Use scaladoc instead. b4a6887 [Michael Armbrust] Address review comments. a2d90ba [Michael Armbrust] Capture child output statically to avoid issues with generators and serialization. 7c13112 [Michael Armbrust] Rewrite Aggregate operator to stream input and use projections. Remove unused local RDD functions implicits. 5096f99 [Michael Armbrust] Make HiveUDAF fields transient since object inspectors are not serializable. 6a4b671 [Michael Armbrust] Add option to avoid binding operators expressions automatically. 92cca08 [Michael Armbrust] Always include serialization debug info when running tests. 1279df2 [Michael Armbrust] Increase default number of partitions. --- project/SparkBuild.scala | 1 + .../sql/catalyst/expressions/BoundAttribute.scala | 6 + .../sql/catalyst/expressions/Projection.scala | 6 +- .../sql/catalyst/expressions/aggregates.scala | 16 +- .../spark/rdd/PartitionLocalRDDFunctions.scala | 100 ----------- .../org/apache/spark/sql/execution/Exchange.scala | 2 +- .../apache/spark/sql/execution/aggregates.scala | 183 +++++++++++++++------ .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 3 + 8 files changed, 157 insertions(+), 160 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d1e4b8b964..6b8740d9f2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -178,6 +178,7 @@ object SparkBuild extends Build { fork := true, javaOptions in Test += "-Dspark.home=" + sparkHome, javaOptions in Test += "-Dspark.testing=1", + javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark").map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions += "-Xmx3g", // Show full stack trace and duration in test cases. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index f70e80b7f2..37b9035df9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -48,11 +48,17 @@ case class BoundReference(ordinal: Int, baseReference: Attribute) override def apply(input: Row): Any = input(ordinal) } +/** + * Used to denote operators that do their own binding of attributes internally. + */ +trait NoBind { self: trees.TreeNode[_] => } + class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] { import BindReferences._ def apply(plan: TreeNode): TreeNode = { plan.transform { + case n: NoBind => n.asInstanceOf[TreeNode] case leafNode if leafNode.children.isEmpty => leafNode case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e => bindReference(e, unaryNode.children.head.output) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 38542d3fc7..5576ecbb65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -28,9 +28,9 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) { protected val exprArray = expressions.toArray def apply(input: Row): Row = { - val outputArray = new Array[Any](exprArray.size) + val outputArray = new Array[Any](exprArray.length) var i = 0 - while (i < exprArray.size) { + while (i < exprArray.length) { outputArray(i) = exprArray(i).apply(input) i += 1 } @@ -57,7 +57,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) def apply(input: Row): Row = { var i = 0 - while (i < exprArray.size) { + while (i < exprArray.length) { mutableRow(i) = exprArray(i).apply(input) i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 7303b155ca..53b884a41e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -27,7 +27,7 @@ abstract class AggregateExpression extends Expression { * Creates a new instance that can be used to compute this aggregate expression for a group * of input rows/ */ - def newInstance: AggregateFunction + def newInstance(): AggregateFunction } /** @@ -75,7 +75,7 @@ abstract class AggregateFunction override def apply(input: Row): Any // Do we really need this? - def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray) + def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray) } case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -89,7 +89,7 @@ case class Count(child: Expression) extends PartialAggregate with trees.UnaryNod SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil) } - override def newInstance = new CountFunction(child, this) + override def newInstance()= new CountFunction(child, this) } case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression { @@ -98,7 +98,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi def nullable = false def dataType = IntegerType override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})" - override def newInstance = new CountDistinctFunction(expressions, this) + override def newInstance()= new CountDistinctFunction(expressions, this) } case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -118,7 +118,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN partialCount :: partialSum :: Nil) } - override def newInstance = new AverageFunction(child, this) + override def newInstance()= new AverageFunction(child, this) } case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -134,7 +134,7 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[ partialSum :: Nil) } - override def newInstance = new SumFunction(child, this) + override def newInstance()= new SumFunction(child, this) } case class SumDistinct(child: Expression) @@ -145,7 +145,7 @@ case class SumDistinct(child: Expression) def dataType = child.dataType override def toString = s"SUM(DISTINCT $child)" - override def newInstance = new SumDistinctFunction(child, this) + override def newInstance()= new SumDistinctFunction(child, this) } case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -160,7 +160,7 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod First(partialFirst.toAttribute), partialFirst :: Nil) } - override def newInstance = new FirstFunction(child, this) + override def newInstance()= new FirstFunction(child, this) } case class AverageFunction(expr: Expression, base: AggregateExpression) diff --git a/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala b/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala deleted file mode 100644 index f1230e7526..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.language.implicitConversions - -import scala.reflect._ -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.{Aggregator, InterruptibleIterator, Logging} -import org.apache.spark.util.collection.AppendOnlyMap - -/* Implicit conversions */ -import org.apache.spark.SparkContext._ - -/** - * Extra functions on RDDs that perform only local operations. These can be used when data has - * already been partitioned correctly. - */ -private[spark] class PartitionLocalRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) - extends Logging - with Serializable { - - /** - * Cogroup corresponding partitions of `this` and `other`. These two RDDs should have - * the same number of partitions. Partitions of these two RDDs are cogrouped - * according to the indexes of partitions. If we have two RDDs and - * each of them has n partitions, we will cogroup the partition i from `this` - * with the partition i from `other`. - * This function will not introduce a shuffling operation. - */ - def cogroupLocally[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { - val cg = self.zipPartitions(other)((iter1:Iterator[(K, V)], iter2:Iterator[(K, W)]) => { - val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - - val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { - if (hadVal) oldVal else Array.fill(2)(new ArrayBuffer[Any]) - } - - val getSeq = (k: K) => { - map.changeValue(k, update) - } - - iter1.foreach { kv => getSeq(kv._1)(0) += kv._2 } - iter2.foreach { kv => getSeq(kv._1)(1) += kv._2 } - - map.iterator - }).mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])} - - cg - } - - /** - * Group the values for each key within a partition of the RDD into a single sequence. - * This function will not introduce a shuffling operation. - */ - def groupByKeyLocally(): RDD[(K, Seq[V])] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - val aggregator = new Aggregator[K, V, ArrayBuffer[V]](createCombiner, mergeValue, _ ++ _) - val bufs = self.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) - }, preservesPartitioning = true) - bufs.asInstanceOf[RDD[(K, Seq[V])]] - } - - /** - * Join corresponding partitions of `this` and `other`. - * If we have two RDDs and each of them has n partitions, - * we will join the partition i from `this` with the partition i from `other`. - * This function will not introduce a shuffling operation. - */ - def joinLocally[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { - cogroupLocally(other).flatMapValues { - case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w) - } - } -} - -private[spark] object PartitionLocalRDDFunctions { - implicit def rddToPartitionLocalRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = - new PartitionLocalRDDFunctions(rdd) -} - - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 869673b1fe..450c142c0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -76,7 +76,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una */ object AddExchange extends Rule[SparkPlan] { // TODO: Determine the number of partitions. - val numPartitions = 8 + val numPartitions = 150 def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator: SparkPlan => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala index 8515a18f18..2a4f7b5670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.execution +import java.util.HashMap + import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ -/* Implicit conversions */ -import org.apache.spark.rdd.PartitionLocalRDDFunctions._ - /** * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each * group. @@ -40,7 +39,7 @@ case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: SparkPlan)(@transient sc: SparkContext) - extends UnaryNode { + extends UnaryNode with NoBind { override def requiredChildDistribution = if (partial) { @@ -55,61 +54,149 @@ case class Aggregate( override def otherCopyArgs = sc :: Nil + // HACK: Generators don't correctly preserve their output through serializations so we grab + // out child's output attributes statically here. + val childOutput = child.output + def output = aggregateExpressions.map(_.toAttribute) - /* Replace all aggregate expressions with spark functions that will compute the result. */ - def createAggregateImplementations() = aggregateExpressions.map { agg => - val impl = agg transform { - case a: AggregateExpression => a.newInstance + /** + * An aggregate that needs to be computed for each row in a group. + * + * @param unbound Unbound version of this aggregate, used for result substitution. + * @param aggregate A bound copy of this aggregate used to create a new aggregation buffer. + * @param resultAttribute An attribute used to refer to the result of this aggregate in the final + * output. + */ + case class ComputedAggregate( + unbound: AggregateExpression, + aggregate: AggregateExpression, + resultAttribute: AttributeReference) + + /** A list of aggregates that need to be computed for each group. */ + @transient + lazy val computedAggregates = aggregateExpressions.flatMap { agg => + agg.collect { + case a: AggregateExpression => + ComputedAggregate( + a, + BindReferences.bindReference(a, childOutput).asInstanceOf[AggregateExpression], + AttributeReference(s"aggResult:$a", a.dataType, nullable = true)()) } + }.toArray + + /** The schema of the result of all aggregate evaluations */ + @transient + lazy val computedSchema = computedAggregates.map(_.resultAttribute) + + /** Creates a new aggregate buffer for a group. */ + def newAggregateBuffer(): Array[AggregateFunction] = { + val buffer = new Array[AggregateFunction](computedAggregates.length) + var i = 0 + while (i < computedAggregates.length) { + buffer(i) = computedAggregates(i).aggregate.newInstance() + i += 1 + } + buffer + } - val remainingAttributes = impl.collect { case a: Attribute => a } - // If any references exist that are not inside agg functions then the must be grouping exprs - // in this case we must rebind them to the grouping tuple. - if (remainingAttributes.nonEmpty) { - val unaliasedAggregateExpr = agg transform { case Alias(c, _) => c } - - // An exact match with a grouping expression - val exactGroupingExpr = groupingExpressions.indexOf(unaliasedAggregateExpr) match { - case -1 => None - case ordinal => Some(BoundReference(ordinal, Alias(impl, "AGGEXPR")().toAttribute)) - } + /** Named attributes used to substitute grouping attributes into the final result. */ + @transient + lazy val namedGroups = groupingExpressions.map { + case ne: NamedExpression => ne -> ne.toAttribute + case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute + } - exactGroupingExpr.getOrElse( - sys.error(s"$agg is not in grouping expressions: $groupingExpressions")) - } else { - impl + /** + * A map of substitutions that are used to insert the aggregate expressions and grouping + * expression into the final result expression. + */ + @transient + lazy val resultMap = + (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap + + /** + * Substituted version of aggregateExpressions expressions which are used to compute final + * output rows given a group and the result of all aggregate computations. + */ + @transient + lazy val resultExpressions = aggregateExpressions.map { agg => + agg.transform { + case e: Expression if resultMap.contains(e) => resultMap(e) } } def execute() = attachTree(this, "execute") { - // TODO: If the child of it is an [[catalyst.execution.Exchange]], - // do not evaluate the groupingExpressions again since we have evaluated it - // in the [[catalyst.execution.Exchange]]. - val grouped = child.execute().mapPartitions { iter => - val buildGrouping = new Projection(groupingExpressions) - iter.map(row => (buildGrouping(row), row.copy())) - }.groupByKeyLocally() - - val result = grouped.map { case (group, rows) => - val aggImplementations = createAggregateImplementations() - - // Pull out all the functions so we can feed each row into them. - val aggFunctions = aggImplementations.flatMap(_ collect { case f: AggregateFunction => f }) - - rows.foreach { row => - aggFunctions.foreach(_.update(row)) + if (groupingExpressions.isEmpty) { + child.execute().mapPartitions { iter => + val buffer = newAggregateBuffer() + var currentRow: Row = null + while (iter.hasNext) { + currentRow = iter.next() + var i = 0 + while (i < buffer.length) { + buffer(i).update(currentRow) + i += 1 + } + } + val resultProjection = new Projection(resultExpressions, computedSchema) + val aggregateResults = new GenericMutableRow(computedAggregates.length) + + var i = 0 + while (i < buffer.length) { + aggregateResults(i) = buffer(i).apply(EmptyRow) + i += 1 + } + + Iterator(resultProjection(aggregateResults)) } - buildRow(aggImplementations.map(_.apply(group))) - } - - // TODO: THIS BREAKS PIPELINING, DOUBLE COMPUTES THE ANSWER, AND USES TOO MUCH MEMORY... - if (groupingExpressions.isEmpty && result.count == 0) { - // When there there is no output to the Aggregate operator, we still output an empty row. - val aggImplementations = createAggregateImplementations() - sc.makeRDD(buildRow(aggImplementations.map(_.apply(null))) :: Nil) } else { - result + child.execute().mapPartitions { iter => + val hashTable = new HashMap[Row, Array[AggregateFunction]] + val groupingProjection = new MutableProjection(groupingExpressions, childOutput) + + var currentRow: Row = null + while (iter.hasNext) { + currentRow = iter.next() + val currentGroup = groupingProjection(currentRow) + var currentBuffer = hashTable.get(currentGroup) + if (currentBuffer == null) { + currentBuffer = newAggregateBuffer() + hashTable.put(currentGroup.copy(), currentBuffer) + } + + var i = 0 + while (i < currentBuffer.length) { + currentBuffer(i).update(currentRow) + i += 1 + } + } + + new Iterator[Row] { + private[this] val hashTableIter = hashTable.entrySet().iterator() + private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) + private[this] val resultProjection = + new MutableProjection(resultExpressions, computedSchema ++ namedGroups.map(_._2)) + private[this] val joinedRow = new JoinedRow + + override final def hasNext: Boolean = hashTableIter.hasNext + + override final def next(): Row = { + val currentEntry = hashTableIter.next() + val currentGroup = currentEntry.getKey + val currentBuffer = currentEntry.getValue + + var i = 0 + while (i < currentBuffer.length) { + // Evaluating an aggregate buffer returns the result. No row is required since we + // already added all rows in the group using update. + aggregateResults(i) = currentBuffer(i).apply(EmptyRow) + i += 1 + } + resultProjection(joinedRow(aggregateResults, currentGroup)) + } + } + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 44901db3f9..2c607455c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -337,13 +337,16 @@ case class HiveGenericUdaf( type UDFType = AbstractGenericUDAFResolver + @transient protected lazy val resolver: AbstractGenericUDAFResolver = createFunction(name) + @transient protected lazy val objectInspector = { resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray) .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray) } + @transient protected lazy val inspectors = children.map(_.dataType).map(toInspector) def dataType: DataType = inspectorToDataType(objectInspector) -- cgit v1.2.3