diff options
author | jerryshao <saisai.shao@intel.com> | 2014-06-23 20:25:46 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-06-23 20:25:46 -0700 |
commit | 56eb8af187b19f09810baafb314b21e07cf0a79c (patch) | |
tree | 30bcca83114d3c28659876c6da593eda2241f30d /sql | |
parent | 51c8168377a89d20d0b2d7b9a28af58593a0fe0c (diff) | |
download | spark-56eb8af187b19f09810baafb314b21e07cf0a79c.tar.gz spark-56eb8af187b19f09810baafb314b21e07cf0a79c.tar.bz2 spark-56eb8af187b19f09810baafb314b21e07cf0a79c.zip |
[SPARK-2124] Move aggregation into shuffle implementations
This PR is a sub-task of SPARK-2044 to move the execution of aggregation into shuffle implementations.
I leave `CoGoupedRDD` and `SubtractedRDD` unchanged because they have their implementations of aggregation. I'm not sure is it suitable to change these two RDDs.
Also I do not move sort related code of `OrderedRDDFunctions` into shuffle, this will be solved in another sub-task.
Author: jerryshao <saisai.shao@intel.com>
Closes #1064 from jerryshao/SPARK-2124 and squashes the following commits:
4a05a40 [jerryshao] Modify according to comments
1f7dcc8 [jerryshao] Style changes
50a2fd6 [jerryshao] Fix test suite issue after moving aggregator to Shuffle reader and writer
1a96190 [jerryshao] Code modification related to the ShuffledRDD
308f635 [jerryshao] initial works of move combiner to ShuffleManager's reader and writer
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 6 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala | 2 |
2 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index f46fa05165..00010ef6e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -47,7 +47,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una iter.map(r => mutablePair.update(hashExpressions(r), r)) } val part = new HashPartitioner(numPartitions) - val shuffled = new ShuffledRDD[Row, Row, MutablePair[Row, Row]](rdd, part) + val shuffled = new ShuffledRDD[Row, Row, Row, MutablePair[Row, Row]](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.map(_._2) @@ -60,7 +60,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una iter.map(row => mutablePair.update(row, null)) } val part = new RangePartitioner(numPartitions, rdd, ascending = true) - val shuffled = new ShuffledRDD[Row, Null, MutablePair[Row, Null]](rdd, part) + val shuffled = new ShuffledRDD[Row, Null, Null, MutablePair[Row, Null]](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.map(_._1) @@ -71,7 +71,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una iter.map(r => mutablePair.update(null, r)) } val partitioner = new HashPartitioner(1) - val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner) + val shuffled = new ShuffledRDD[Null, Row, Row, MutablePair[Null, Row]](rdd, partitioner) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.map(_._2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 18f4a5877b..b40d4e3a3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -105,7 +105,7 @@ case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext iter.take(limit).map(row => mutablePair.update(false, row)) } val part = new HashPartitioner(1) - val shuffled = new ShuffledRDD[Boolean, Row, MutablePair[Boolean, Row]](rdd, part) + val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.mapPartitions(_.take(limit).map(_._2)) } |