diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-24 00:27:14 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-24 00:27:14 -0600 |
commit | f442e7d83c93c894215427f5ef86c96d61160e0e (patch) | |
tree | 0e75e43914962171419c2041070f5aadad876f8b | |
parent | cec87a0653904eb048b2219d3701d09e5bb48d5a (diff) | |
download | spark-f442e7d83c93c894215427f5ef86c96d61160e0e.tar.gz spark-f442e7d83c93c894215427f5ef86c96d61160e0e.tar.bz2 spark-f442e7d83c93c894215427f5ef86c96d61160e0e.zip |
Update for split->partition rename.
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/SubtractedRDD.scala | 24 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 2 |
3 files changed, 17 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 27a4d2d287..9e8eaee756 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -397,17 +397,17 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD with the elements from `this` that are not in `other`. * - * Uses `this` partitioner/split size, because even if `other` is huge, the resulting + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ def subtract(other: RDD[T]): RDD[T] = - subtract(other, partitioner.getOrElse(new HashPartitioner(splits.size))) + subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) /** * Return an RDD with the elements from `this` that are not in `other`. */ - def subtract(other: RDD[T], numSplits: Int): RDD[T] = - subtract(other, new HashPartitioner(numSplits)) + def subtract(other: RDD[T], numPartitions: Int): RDD[T] = + subtract(other, new HashPartitioner(numPartitions)) /** * Return an RDD with the elements from `this` that are not in `other`. diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 244874e4e0..daf9cc993c 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -6,7 +6,7 @@ import spark.RDD import spark.Partitioner import spark.Dependency import spark.TaskContext -import spark.Split +import spark.Partition import spark.SparkEnv import spark.ShuffleDependency import spark.OneToOneDependency @@ -24,7 +24,7 @@ import spark.OneToOneDependency * touch each once to decide if the value needs to be removed. * * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as - * you can use `rdd1`'s partitioner/split size and not worry about running + * you can use `rdd1`'s partitioner/partition size and not worry about running * out of memory because of the size of `rdd2`. */ private[spark] class SubtractedRDD[T: ClassManifest]( @@ -63,16 +63,16 @@ private[spark] class SubtractedRDD[T: ClassManifest]( } } - override def getSplits: Array[Split] = { - val array = new Array[Split](part.numPartitions) + override def getPartitions: Array[Partition] = { + val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.size) { - // Each CoGroupSplit will dependend on rdd1 and rdd2 - array(i) = new CoGroupSplit(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) => + // Each CoGroupPartition will depend on rdd1 and rdd2 + array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) => dependencies(j) match { case s: ShuffleDependency[_, _] => new ShuffleCoGroupSplitDep(s.shuffleId) case _ => - new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i)) + new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } }.toList) } @@ -81,21 +81,21 @@ private[spark] class SubtractedRDD[T: ClassManifest]( override val partitioner = Some(part) - override def compute(s: Split, context: TaskContext): Iterator[T] = { - val split = s.asInstanceOf[CoGroupSplit] + override def compute(p: Partition, context: TaskContext): Iterator[T] = { + val partition = p.asInstanceOf[CoGroupPartition] val set = new JHashSet[T] def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => for (k <- rdd.iterator(itsSplit, context)) op(k.asInstanceOf[T]) case ShuffleCoGroupSplitDep(shuffleId) => - for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, split.index)) + for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index)) op(k.asInstanceOf[T]) } // the first dep is rdd1; add all keys to the set - integrate(split.deps(0), set.add) + integrate(partition.deps(0), set.add) // the second dep is rdd2; remove all of its keys from the set - integrate(split.deps(1), set.remove) + integrate(partition.deps(1), set.remove) set.iterator } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 05fb280d0a..77e0eab829 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -241,7 +241,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { val b = sc.parallelize(Array(2, 3, 4), 4) val c = a.subtract(b) assert(c.collect().toSet === Set(1)) - assert(c.splits.size === a.splits.size) + assert(c.partitions.size === a.partitions.size) } test("subtract with narrow dependency") { |