diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-07 09:56:53 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-07 09:56:53 -0700 |
commit | 039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3 (patch) | |
tree | 8e4552da7d962c7407d2996bb8c4bd61aa1a10fa | |
parent | 933e4f5123b3d889569f3383352d7ea73c6ac9af (diff) | |
parent | e10308f5a0a17627317306dfaf19aa20b46490fd (diff) | |
download | spark-039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3.tar.gz spark-039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3.tar.bz2 spark-039cc6228e92b3ee7a05ebbbe4b915be2c1db1f3.zip |
Merge pull request #251 from JoshRosen/docs/internals
Document Dependency classes and make minor interface improvements
-rw-r--r-- | core/src/main/scala/spark/Dependency.scala | 37 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ShuffledRDD.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/UnionRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 4 |
5 files changed, 40 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index c0ff94acc6..19a51dd5b8 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -1,22 +1,51 @@ package spark -abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable +/** + * Base class for dependencies. + */ +abstract class Dependency[T](val rdd: RDD[T]) extends Serializable -abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { +/** + * Base class for dependencies where each partition of the parent RDD is used by at most one + * partition of the child RDD. Narrow dependencies allow for pipelined execution. + */ +abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { + /** + * Get the parent partitions for a child partition. + * @param outputPartition a partition of the child RDD + * @return the partitions of the parent RDD that the child partition depends upon + */ def getParents(outputPartition: Int): Seq[Int] } +/** + * Represents a dependency on the output of a shuffle stage. + * @param shuffleId the shuffle id + * @param rdd the parent RDD + * @param aggregator optional aggregator; this allows for map-side combining + * @param partitioner partitioner used to partition the shuffle output + */ class ShuffleDependency[K, V, C]( val shuffleId: Int, @transient rdd: RDD[(K, V)], - val aggregator: Aggregator[K, V, C], + val aggregator: Option[Aggregator[K, V, C]], val partitioner: Partitioner) - extends Dependency(rdd, true) + extends Dependency(rdd) +/** + * Represents a one-to-one dependency between partitions of the parent and child RDDs. + */ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) } +/** + * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. + * @param rdd the parent RDD + * @param inStart the start of the range in the parent RDD + * @param outStart the start of the range in the child RDD + * @param length the length of the range + */ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 8fa0749184..f1defbe492 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -49,7 +49,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) } else { logInfo("Adding shuffle dependency with " + rdd) deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( - context.newShuffleId, rdd, aggr, part) + context.newShuffleId, rdd, Some(aggr), part) } } deps.toList diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 769ccf8caa..7577909b83 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -22,7 +22,7 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { */ abstract class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], - aggregator: Aggregator[K, V, C], + aggregator: Option[Aggregator[K, V, C]], part: Partitioner) extends RDD[(K, C)](parent.context) { @@ -48,7 +48,7 @@ class RepartitionShuffledRDD[K, V]( part: Partitioner) extends ShuffledRDD[K, V, V]( parent, - Aggregator[K, V, V](null, null, null, false), + None, part) { override def compute(split: Split): Iterator[(K, V)] = { @@ -95,7 +95,7 @@ class ShuffledAggregatedRDD[K, V, C]( @transient parent: RDD[(K, V)], aggregator: Aggregator[K, V, C], part : Partitioner) - extends ShuffledRDD[K, V, C](parent, aggregator, part) { + extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) { override def compute(split: Split): Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 4ba2848491..f0b9225f7c 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -43,7 +43,7 @@ class UnionRDD[T: ClassManifest]( override val dependencies = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 - for ((rdd, index) <- rdds.zipWithIndex) { + for (rdd <- rdds) { deps += new RangeDependency(rdd, 0, pos, rdd.splits.size) pos += rdd.splits.size } diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 3e5ba10fd9..86796d3677 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -111,11 +111,11 @@ private[spark] class ShuffleMapTask( override def run(attemptId: Long): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions - val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] val partitioner = dep.partitioner val bucketIterators = - if (aggregator.mapSideCombine) { + if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) { + val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] // Apply combiners (map-side aggregation) to the map output. val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) for (elem <- rdd.iterator(split)) { |