diff options
author | Andy Konwinski <andyk@berkeley.edu> | 2012-10-05 19:53:54 -0700 |
---|---|---|
committer | Andy Konwinski <andyk@berkeley.edu> | 2012-10-05 19:53:54 -0700 |
commit | a242cdd0a65f5a9abf5998db1ced626bcbdb87bc (patch) | |
tree | 8e9d4c392e33d09239f847d159da8323e87d7ffa /core | |
parent | d7363a6b8a1ce8620eadc7d417aee7d61d6680b8 (diff) | |
download | spark-a242cdd0a65f5a9abf5998db1ced626bcbdb87bc.tar.gz spark-a242cdd0a65f5a9abf5998db1ced626bcbdb87bc.tar.bz2 spark-a242cdd0a65f5a9abf5998db1ced626bcbdb87bc.zip |
Factor subclasses of RDD out of RDD.scala into their own classes
in the rdd package.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 71 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/FilteredRDD.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/FlatMappedRDD.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/GlommedRDD.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/MapPartitionsRDD.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala | 21 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/MappedRDD.scala | 16 | ||||
-rw-r--r-- | core/src/test/scala/spark/AccumulatorSuite.scala | 8 |
8 files changed, 106 insertions, 66 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f0e4fb643f..f0d2b2d783 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -33,6 +33,12 @@ import spark.partial.GroupedCountEvaluator import spark.partial.PartialResult import spark.rdd.BlockRDD import spark.rdd.CartesianRDD +import spark.rdd.FilteredRDD +import spark.rdd.FlatMappedRDD +import spark.rdd.GlommedRDD +import spark.rdd.MappedRDD +import spark.rdd.MapPartitionsRDD +import spark.rdd.MapPartitionsWithSplitRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD import spark.rdd.UnionRDD @@ -418,67 +424,4 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } -} - -private[spark] -class MappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => U) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).map(f) -} - -private[spark] -class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => TraversableOnce[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).flatMap(f) -} - -private[spark] -class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).filter(f) -} - -private[spark] -class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator -} - -private[spark] -class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: Iterator[T] => Iterator[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = f(prev.iterator(split)) -} - -/** - * A variant of the MapPartitionsRDD that passes the split index into the - * closure. This can be used to generate or collect partition specific - * information such as the number of tuples in a partition. - */ -private[spark] -class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: (Int, Iterator[T]) => Iterator[U]) - extends RDD[U](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = f(split.index, prev.iterator(split)) -} +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala new file mode 100644 index 0000000000..dfe9dc73f3 --- /dev/null +++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala @@ -0,0 +1,12 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).filter(f) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala new file mode 100644 index 0000000000..3534dc8057 --- /dev/null +++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: T => TraversableOnce[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).flatMap(f) +} diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala new file mode 100644 index 0000000000..e30564f2da --- /dev/null +++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala @@ -0,0 +1,12 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala new file mode 100644 index 0000000000..b2c7a1cb9e --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: Iterator[T] => Iterator[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = f(prev.iterator(split)) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala new file mode 100644 index 0000000000..adc541694e --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala @@ -0,0 +1,21 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +/** + * A variant of the MapPartitionsRDD that passes the split index into the + * closure. This can be used to generate or collect partition specific + * information such as the number of tuples in a partition. + */ +private[spark] +class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: (Int, Iterator[T]) => Iterator[U]) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = f(split.index, prev.iterator(split)) +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala new file mode 100644 index 0000000000..59bedad8ef --- /dev/null +++ b/core/src/main/scala/spark/rdd/MappedRDD.scala @@ -0,0 +1,16 @@ +package spark.rdd + +import spark.OneToOneDependency +import spark.RDD +import spark.Split + +private[spark] +class MappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + f: T => U) + extends RDD[U](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).map(f) +}
\ No newline at end of file diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index b920e53534..403e675f37 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() sc = null } + System.clearProperty("spark.master.port") } test ("basic accumulation"){ @@ -91,7 +92,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val maxI = 1000 for (nThreads <- List(1, 10)) { // test single & multi-threaded - val sc = new SparkContext("local[" + nThreads + "]", "test") + sc = new SparkContext("local[" + nThreads + "]", "test") val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]()) @@ -110,6 +111,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter mapAcc.value should contain (i -> i.toString) } sc.stop() + sc = null } } @@ -117,7 +119,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter import SetAccum._ val maxI = 1000 for (nThreads <- List(1, 10)) { //test single & multi-threaded - val sc = new SparkContext("local[" + nThreads + "]", "test") + sc = new SparkContext("local[" + nThreads + "]", "test") val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} val d = sc.parallelize(groupedInts) @@ -125,6 +127,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter x => acc.localValue ++= x } acc.value should be ( (0 to maxI).toSet) + sc.stop() + sc = null } } |