aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-04-28 16:03:22 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-04-28 16:03:22 -0700
commit15acd49f07c3cde0a381f4abe139b17791a910b4 (patch)
treeaae78129f27402c39ba523890fd4844f95be1cbc
parent6e84635ab904ee2798f1d6acd3a8ed5e01563e54 (diff)
downloadspark-15acd49f07c3cde0a381f4abe139b17791a910b4.tar.gz
spark-15acd49f07c3cde0a381f4abe139b17791a910b4.tar.bz2
spark-15acd49f07c3cde0a381f4abe139b17791a910b4.zip
Actually rename classes to ZippedPartitions*
(the previous commit only renamed the file)
-rw-r--r--core/src/main/scala/spark/RDD.scala18
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala26
-rw-r--r--core/src/test/scala/spark/ZippedPartitionsSuite.scala6
3 files changed, 25 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index bded55238f..4310f745f3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -35,9 +35,9 @@ import spark.rdd.ShuffledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
-import spark.rdd.MapZippedPartitionsRDD2
-import spark.rdd.MapZippedPartitionsRDD3
-import spark.rdd.MapZippedPartitionsRDD4
+import spark.rdd.ZippedPartitionsRDD2
+import spark.rdd.ZippedPartitionsRDD3
+import spark.rdd.ZippedPartitionsRDD4
import spark.storage.StorageLevel
import SparkContext._
@@ -441,21 +441,21 @@ abstract class RDD[T: ClassManifest](
def zipPartitions[B: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B]) => Iterator[V],
- rdd2: RDD[B]) =
- new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+ rdd2: RDD[B]): RDD[V] =
+ new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
rdd2: RDD[B],
- rdd3: RDD[C]) =
- new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+ rdd3: RDD[C]): RDD[V] =
+ new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C],
- rdd4: RDD[D]) =
- new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+ rdd4: RDD[D]): RDD[V] =
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
// Actions (launch a job to return a value to the user program)
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index 3520fd24b0..b3113c1969 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -3,7 +3,7 @@ package spark.rdd
import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
-private[spark] class MapZippedPartition(
+private[spark] class ZippedPartitions(
idx: Int,
@transient rdds: Seq[RDD[_]])
extends Partition {
@@ -20,7 +20,7 @@ private[spark] class MapZippedPartition(
}
}
-abstract class MapZippedPartitionsBaseRDD[V: ClassManifest](
+abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
sc: SparkContext,
var rdds: Seq[RDD[_]])
extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
@@ -32,13 +32,13 @@ abstract class MapZippedPartitionsBaseRDD[V: ClassManifest](
}
val array = new Array[Partition](sizes(0))
for (i <- 0 until sizes(0)) {
- array(i) = new MapZippedPartition(i, rdds)
+ array(i) = new ZippedPartitions(i, rdds)
}
array
}
override def getPreferredLocations(s: Partition): Seq[String] = {
- val splits = s.asInstanceOf[MapZippedPartition].partitions
+ val splits = s.asInstanceOf[ZippedPartitions].partitions
val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2))
preferredLocations.reduce((x, y) => x.intersect(y))
}
@@ -49,15 +49,15 @@ abstract class MapZippedPartitionsBaseRDD[V: ClassManifest](
}
}
-class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
+class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B])
- extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
- val partitions = s.asInstanceOf[MapZippedPartition].partitions
+ val partitions = s.asInstanceOf[ZippedPartitions].partitions
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
}
@@ -68,17 +68,17 @@ class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManife
}
}
-class MapZippedPartitionsRDD3
+class ZippedPartitionsRDD3
[A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B],
var rdd3: RDD[C])
- extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
- val partitions = s.asInstanceOf[MapZippedPartition].partitions
+ val partitions = s.asInstanceOf[ZippedPartitions].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
rdd3.iterator(partitions(2), context))
@@ -92,7 +92,7 @@ class MapZippedPartitionsRDD3
}
}
-class MapZippedPartitionsRDD4
+class ZippedPartitionsRDD4
[A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
@@ -100,10 +100,10 @@ class MapZippedPartitionsRDD4
var rdd2: RDD[B],
var rdd3: RDD[C],
var rdd4: RDD[D])
- extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
- val partitions = s.asInstanceOf[MapZippedPartition].partitions
+ val partitions = s.asInstanceOf[ZippedPartitions].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
rdd3.iterator(partitions(2), context),
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
index 834b517cbc..5f60aa75d7 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -11,20 +11,20 @@ import org.scalacheck.Prop._
import SparkContext._
-object MapZippedPartitionsSuite {
+object ZippedPartitionsSuite {
def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = {
Iterator(i.toArray.size, s.toArray.size, d.toArray.size)
}
}
-class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext {
+class ZippedPartitionsSuite extends FunSuite with LocalSparkContext {
test("print sizes") {
sc = new SparkContext("local", "test")
val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
- val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3)
+ val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)