diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-14 10:00:22 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-14 10:00:22 -0700 |
commit | 64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68 (patch) | |
tree | a37bc3102da7cc00038dde321a438e43d63cb637 /core/src/main | |
parent | 64b52166ee3b46667a5ba1ec276e6836dba898c5 (diff) | |
download | spark-64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68.tar.gz spark-64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68.tar.bz2 spark-64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68.zip |
Made ShuffleDependency automatically find a shuffle ID for itself
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/Dependency.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ShuffledRDD.scala | 2 |
3 files changed, 6 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 19a51dd5b8..dfc7e292b7 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -26,11 +26,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * @param partitioner partitioner used to partition the shuffle output */ class ShuffleDependency[K, V, C]( - val shuffleId: Int, @transient rdd: RDD[(K, V)], val aggregator: Option[Aggregator[K, V, C]], val partitioner: Partitioner) - extends Dependency(rdd) + extends Dependency(rdd) { + + val shuffleId: Int = rdd.context.newShuffleId() +} /** * Represents a one-to-one dependency between partitions of the parent and child RDDs. diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index f1defbe492..ace2500627 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -48,8 +48,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) deps += new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( - context.newShuffleId, rdd, Some(aggr), part) + deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](rdd, Some(aggr), part) } } deps.toList diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 7577909b83..be120acc71 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -35,7 +35,7 @@ abstract class ShuffledRDD[K, V, C]( override def preferredLocations(split: Split) = Nil - val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) + val dep = new ShuffleDependency(parent, aggregator, part) override val dependencies = List(dep) } |