aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-06-23 20:25:46 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-23 20:25:46 -0700
commit56eb8af187b19f09810baafb314b21e07cf0a79c (patch)
tree30bcca83114d3c28659876c6da593eda2241f30d /graphx
parent51c8168377a89d20d0b2d7b9a28af58593a0fe0c (diff)
downloadspark-56eb8af187b19f09810baafb314b21e07cf0a79c.tar.gz
spark-56eb8af187b19f09810baafb314b21e07cf0a79c.tar.bz2
spark-56eb8af187b19f09810baafb314b21e07cf0a79c.zip
[SPARK-2124] Move aggregation into shuffle implementations
This PR is a sub-task of SPARK-2044 to move the execution of aggregation into shuffle implementations. I leave `CoGoupedRDD` and `SubtractedRDD` unchanged because they have their implementations of aggregation. I'm not sure is it suitable to change these two RDDs. Also I do not move sort related code of `OrderedRDDFunctions` into shuffle, this will be solved in another sub-task. Author: jerryshao <saisai.shao@intel.com> Closes #1064 from jerryshao/SPARK-2124 and squashes the following commits: 4a05a40 [jerryshao] Modify according to comments 1f7dcc8 [jerryshao] Style changes 50a2fd6 [jerryshao] Fix test suite issue after moving aggregator to Shuffle reader and writer 1a96190 [jerryshao] Code modification related to the ShuffledRDD 308f635 [jerryshao] initial works of move combiner to ShuffleManager's reader and writer
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala7
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala4
2 files changed, 6 insertions, 5 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index 1c6d7e59e9..d85afa45b1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -62,7 +62,8 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/* , AnyRe
private[graphx]
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
- val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner)
+ val rdd = new ShuffledRDD[PartitionID, (VertexId, T), (VertexId, T), VertexBroadcastMsg[T]](
+ self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
@@ -84,7 +85,7 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
- new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner)
+ new ShuffledRDD[PartitionID, T, T, MessageToPartition[T]](self, partitioner)
}
}
@@ -103,7 +104,7 @@ object MsgRDDFunctions {
private[graphx]
class VertexRDDFunctions[VD: ClassTag](self: RDD[(VertexId, VD)]) {
def copartitionWithVertices(partitioner: Partitioner): RDD[(VertexId, VD)] = {
- val rdd = new ShuffledRDD[VertexId, VD, (VertexId, VD)](self, partitioner)
+ val rdd = new ShuffledRDD[VertexId, VD, VD, (VertexId, VD)](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[VD] == ClassTag.Int) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index d02e9238ad..3827ac8d0f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -46,8 +46,8 @@ private[graphx]
class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
/** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
- new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
- .setSerializer(new RoutingTableMessageSerializer)
+ new ShuffledRDD[VertexId, (PartitionID, Byte), (PartitionID, Byte), RoutingTableMessage](
+ self, partitioner).setSerializer(new RoutingTableMessageSerializer)
}
}