aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-24 00:27:14 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-24 00:27:14 -0600
commitf442e7d83c93c894215427f5ef86c96d61160e0e (patch)
tree0e75e43914962171419c2041070f5aadad876f8b /core
parentcec87a0653904eb048b2219d3701d09e5bb48d5a (diff)
downloadspark-f442e7d83c93c894215427f5ef86c96d61160e0e.tar.gz
spark-f442e7d83c93c894215427f5ef86c96d61160e0e.tar.bz2
spark-f442e7d83c93c894215427f5ef86c96d61160e0e.zip
Update for split->partition rename.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala24
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala2
3 files changed, 17 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 27a4d2d287..9e8eaee756 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -397,17 +397,17 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
- * Uses `this` partitioner/split size, because even if `other` is huge, the resulting
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: RDD[T]): RDD[T] =
- subtract(other, partitioner.getOrElse(new HashPartitioner(splits.size)))
+ subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
- def subtract(other: RDD[T], numSplits: Int): RDD[T] =
- subtract(other, new HashPartitioner(numSplits))
+ def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+ subtract(other, new HashPartitioner(numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 244874e4e0..daf9cc993c 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -6,7 +6,7 @@ import spark.RDD
import spark.Partitioner
import spark.Dependency
import spark.TaskContext
-import spark.Split
+import spark.Partition
import spark.SparkEnv
import spark.ShuffleDependency
import spark.OneToOneDependency
@@ -24,7 +24,7 @@ import spark.OneToOneDependency
* touch each once to decide if the value needs to be removed.
*
* This is particularly helpful when `rdd1` is much smaller than `rdd2`, as
- * you can use `rdd1`'s partitioner/split size and not worry about running
+ * you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[T: ClassManifest](
@@ -63,16 +63,16 @@ private[spark] class SubtractedRDD[T: ClassManifest](
}
}
- override def getSplits: Array[Split] = {
- val array = new Array[Split](part.numPartitions)
+ override def getPartitions: Array[Partition] = {
+ val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
- // Each CoGroupSplit will dependend on rdd1 and rdd2
- array(i) = new CoGroupSplit(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
+ // Each CoGroupPartition will depend on rdd1 and rdd2
+ array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
case s: ShuffleDependency[_, _] =>
new ShuffleCoGroupSplitDep(s.shuffleId)
case _ =>
- new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
+ new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
}.toList)
}
@@ -81,21 +81,21 @@ private[spark] class SubtractedRDD[T: ClassManifest](
override val partitioner = Some(part)
- override def compute(s: Split, context: TaskContext): Iterator[T] = {
- val split = s.asInstanceOf[CoGroupSplit]
+ override def compute(p: Partition, context: TaskContext): Iterator[T] = {
+ val partition = p.asInstanceOf[CoGroupPartition]
val set = new JHashSet[T]
def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
for (k <- rdd.iterator(itsSplit, context))
op(k.asInstanceOf[T])
case ShuffleCoGroupSplitDep(shuffleId) =>
- for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, split.index))
+ for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index))
op(k.asInstanceOf[T])
}
// the first dep is rdd1; add all keys to the set
- integrate(split.deps(0), set.add)
+ integrate(partition.deps(0), set.add)
// the second dep is rdd2; remove all of its keys from the set
- integrate(split.deps(1), set.remove)
+ integrate(partition.deps(1), set.remove)
set.iterator
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 05fb280d0a..77e0eab829 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -241,7 +241,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val b = sc.parallelize(Array(2, 3, 4), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set(1))
- assert(c.splits.size === a.splits.size)
+ assert(c.partitions.size === a.partitions.size)
}
test("subtract with narrow dependency") {