aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndy Konwinski <andyk@berkeley.edu>2012-10-05 19:53:54 -0700
committerAndy Konwinski <andyk@berkeley.edu>2012-10-05 19:53:54 -0700
commita242cdd0a65f5a9abf5998db1ced626bcbdb87bc (patch)
tree8e9d4c392e33d09239f847d159da8323e87d7ffa /core
parentd7363a6b8a1ce8620eadc7d417aee7d61d6680b8 (diff)
downloadspark-a242cdd0a65f5a9abf5998db1ced626bcbdb87bc.tar.gz
spark-a242cdd0a65f5a9abf5998db1ced626bcbdb87bc.tar.bz2
spark-a242cdd0a65f5a9abf5998db1ced626bcbdb87bc.zip
Factor subclasses of RDD out of RDD.scala into their own classes
in the rdd package.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala71
-rw-r--r--core/src/main/scala/spark/rdd/FilteredRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/GlommedRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala21
-rw-r--r--core/src/main/scala/spark/rdd/MappedRDD.scala16
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala8
8 files changed, 106 insertions, 66 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index f0e4fb643f..f0d2b2d783 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -33,6 +33,12 @@ import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
import spark.rdd.BlockRDD
import spark.rdd.CartesianRDD
+import spark.rdd.FilteredRDD
+import spark.rdd.FlatMappedRDD
+import spark.rdd.GlommedRDD
+import spark.rdd.MappedRDD
+import spark.rdd.MapPartitionsRDD
+import spark.rdd.MapPartitionsWithSplitRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.UnionRDD
@@ -418,67 +424,4 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
-}
-
-private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: T => U)
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).map(f)
-}
-
-private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: T => TraversableOnce[U])
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).flatMap(f)
-}
-
-private[spark]
-class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) {
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).filter(f)
-}
-
-private[spark]
-class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) {
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
-}
-
-private[spark]
-class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: Iterator[T] => Iterator[U])
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = f(prev.iterator(split))
-}
-
-/**
- * A variant of the MapPartitionsRDD that passes the split index into the
- * closure. This can be used to generate or collect partition specific
- * information such as the number of tuples in a partition.
- */
-private[spark]
-class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: (Int, Iterator[T]) => Iterator[U])
- extends RDD[U](prev.context) {
-
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = f(split.index, prev.iterator(split))
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
new file mode 100644
index 0000000000..dfe9dc73f3
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -0,0 +1,12 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).filter(f)
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
new file mode 100644
index 0000000000..3534dc8057
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: T => TraversableOnce[U])
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).flatMap(f)
+}
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
new file mode 100644
index 0000000000..e30564f2da
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala
@@ -0,0 +1,12 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) {
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
new file mode 100644
index 0000000000..b2c7a1cb9e
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: Iterator[T] => Iterator[U])
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = f(prev.iterator(split))
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
new file mode 100644
index 0000000000..adc541694e
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -0,0 +1,21 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+/**
+ * A variant of the MapPartitionsRDD that passes the split index into the
+ * closure. This can be used to generate or collect partition specific
+ * information such as the number of tuples in a partition.
+ */
+private[spark]
+class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: (Int, Iterator[T]) => Iterator[U])
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = f(split.index, prev.iterator(split))
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
new file mode 100644
index 0000000000..59bedad8ef
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -0,0 +1,16 @@
+package spark.rdd
+
+import spark.OneToOneDependency
+import spark.RDD
+import spark.Split
+
+private[spark]
+class MappedRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: T => U)
+ extends RDD[U](prev.context) {
+
+ override def splits = prev.splits
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).map(f)
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index b920e53534..403e675f37 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
sc = null
}
+ System.clearProperty("spark.master.port")
}
test ("basic accumulation"){
@@ -91,7 +92,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val maxI = 1000
for (nThreads <- List(1, 10)) {
// test single & multi-threaded
- val sc = new SparkContext("local[" + nThreads + "]", "test")
+ sc = new SparkContext("local[" + nThreads + "]", "test")
val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
@@ -110,6 +111,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
mapAcc.value should contain (i -> i.toString)
}
sc.stop()
+ sc = null
}
}
@@ -117,7 +119,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
- val sc = new SparkContext("local[" + nThreads + "]", "test")
+ sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
val d = sc.parallelize(groupedInts)
@@ -125,6 +127,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
x => acc.localValue ++= x
}
acc.value should be ( (0 to maxI).toSet)
+ sc.stop()
+ sc = null
}
}