aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-03-04 15:48:47 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-03-04 15:48:47 -0800
commit9148b968cf34b898c36a7f9672382533ee54db2d (patch)
tree78ff12550a43ae3cc88eaad1335b175502f3b049 /core/src/main
parent9f0dc829cbaa9aa5011fb917010d13ea5e0a19d7 (diff)
downloadspark-9148b968cf34b898c36a7f9672382533ee54db2d.tar.gz
spark-9148b968cf34b898c36a7f9672382533ee54db2d.tar.bz2
spark-9148b968cf34b898c36a7f9672382533ee54db2d.zip
mapWith, flatMapWith and filterWith
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala59
1 files changed, 58 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 584efa8adf..de791598a4 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -365,6 +365,63 @@ abstract class RDD[T: ClassManifest](
new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
/**
+ * Maps f over this RDD where f takes an additional parameter of type A. This
+ * additional parameter is produced by a factory method T => A which is called
+ * on each invocation of f. This factory method is produced by the factoryBuilder,
+ * an instance of which is constructed in each partition from the partition index
+ * and a seed value of type B.
+ */
+ def mapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest](
+ f:(A, T) => U,
+ factoryBuilder: (Int, B) => (T => A),
+ factorySeed: B,
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val factory = factoryBuilder(index, factorySeed)
+ iter.map(t => f(factory(t), t))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * FlatMaps f over this RDD where f takes an additional parameter of type A. This
+ * additional parameter is produced by a factory method T => A which is called
+ * on each invocation of f. This factory method is produced by the factoryBuilder,
+ * an instance of which is constructed in each partition from the partition index
+ * and a seed value of type B.
+ */
+ def flatMapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest](
+ f:(A, T) => Seq[U],
+ factoryBuilder: (Int, B) => (T => A),
+ factorySeed: B,
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val factory = factoryBuilder(index, factorySeed)
+ iter.flatMap(t => f(factory(t), t))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * Filters this RDD with p, where p takes an additional parameter of type A. This
+ * additional parameter is produced by a factory method T => A which is called
+ * on each invocation of p. This factory method is produced by the factoryBuilder,
+ * an instance of which is constructed in each partition from the partition index
+ * and a seed value of type B.
+ */
+ def filterWith[A: ClassManifest, B: ClassManifest](
+ p:(A, T) => Boolean,
+ factoryBuilder: (Int, B) => (T => A),
+ factorySeed: B,
+ preservesPartitioning: Boolean = false): RDD[T] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val factory = factoryBuilder(index, factorySeed)
+ iter.filter(t => p(factory(t), t))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
@@ -404,7 +461,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/