aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-14 10:00:22 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-14 10:00:22 -0700
commit64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68 (patch)
treea37bc3102da7cc00038dde321a438e43d63cb637 /core/src/main
parent64b52166ee3b46667a5ba1ec276e6836dba898c5 (diff)
downloadspark-64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68.tar.gz
spark-64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68.tar.bz2
spark-64dbf8d3726cbc5700ee6fef871fd2bfe0d9aa68.zip
Made ShuffleDependency automatically find a shuffle ID for itself
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Dependency.scala6
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala3
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala2
3 files changed, 6 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index 19a51dd5b8..dfc7e292b7 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -26,11 +26,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* @param partitioner partitioner used to partition the shuffle output
*/
class ShuffleDependency[K, V, C](
- val shuffleId: Int,
@transient rdd: RDD[(K, V)],
val aggregator: Option[Aggregator[K, V, C]],
val partitioner: Partitioner)
- extends Dependency(rdd)
+ extends Dependency(rdd) {
+
+ val shuffleId: Int = rdd.context.newShuffleId()
+}
/**
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index f1defbe492..ace2500627 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -48,8 +48,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
deps += new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](
- context.newShuffleId, rdd, Some(aggr), part)
+ deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](rdd, Some(aggr), part)
}
}
deps.toList
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 7577909b83..be120acc71 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -35,7 +35,7 @@ abstract class ShuffledRDD[K, V, C](
override def preferredLocations(split: Split) = Nil
- val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
+ val dep = new ShuffleDependency(parent, aggregator, part)
override val dependencies = List(dep)
}