diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-03-04 15:48:47 -0800 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-03-04 15:48:47 -0800 |
commit | 9148b968cf34b898c36a7f9672382533ee54db2d (patch) | |
tree | 78ff12550a43ae3cc88eaad1335b175502f3b049 /core/src/main | |
parent | 9f0dc829cbaa9aa5011fb917010d13ea5e0a19d7 (diff) | |
download | spark-9148b968cf34b898c36a7f9672382533ee54db2d.tar.gz spark-9148b968cf34b898c36a7f9672382533ee54db2d.tar.bz2 spark-9148b968cf34b898c36a7f9672382533ee54db2d.zip |
mapWith, flatMapWith and filterWith
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 59 |
1 files changed, 58 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 584efa8adf..de791598a4 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -365,6 +365,63 @@ abstract class RDD[T: ClassManifest]( new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) /** + * Maps f over this RDD where f takes an additional parameter of type A. This + * additional parameter is produced by a factory method T => A which is called + * on each invocation of f. This factory method is produced by the factoryBuilder, + * an instance of which is constructed in each partition from the partition index + * and a seed value of type B. + */ + def mapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( + f:(A, T) => U, + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val factory = factoryBuilder(index, factorySeed) + iter.map(t => f(factory(t), t)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** + * FlatMaps f over this RDD where f takes an additional parameter of type A. This + * additional parameter is produced by a factory method T => A which is called + * on each invocation of f. This factory method is produced by the factoryBuilder, + * an instance of which is constructed in each partition from the partition index + * and a seed value of type B. + */ + def flatMapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( + f:(A, T) => Seq[U], + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val factory = factoryBuilder(index, factorySeed) + iter.flatMap(t => f(factory(t), t)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** + * Filters this RDD with p, where p takes an additional parameter of type A. This + * additional parameter is produced by a factory method T => A which is called + * on each invocation of p. This factory method is produced by the factoryBuilder, + * an instance of which is constructed in each partition from the partition index + * and a seed value of type B. + */ + def filterWith[A: ClassManifest, B: ClassManifest]( + p:(A, T) => Boolean, + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[T] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { + val factory = factoryBuilder(index, factorySeed) + iter.filter(t => p(factory(t), t)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of * partitions* and the *same number of elements in each partition* (e.g. one was made through @@ -404,7 +461,7 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ |