From 9148b968cf34b898c36a7f9672382533ee54db2d Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 4 Mar 2013 15:48:47 -0800 Subject: mapWith, flatMapWith and filterWith --- core/src/main/scala/spark/RDD.scala | 59 +++++++++++++++++++++++++++- core/src/test/scala/spark/RDDSuite.scala | 66 ++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 584efa8adf..de791598a4 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -364,6 +364,63 @@ abstract class RDD[T: ClassManifest]( preservesPartitioning: Boolean = false): RDD[U] = new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) + /** + * Maps f over this RDD where f takes an additional parameter of type A. This + * additional parameter is produced by a factory method T => A which is called + * on each invocation of f. This factory method is produced by the factoryBuilder, + * an instance of which is constructed in each partition from the partition index + * and a seed value of type B. + */ + def mapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( + f:(A, T) => U, + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val factory = factoryBuilder(index, factorySeed) + iter.map(t => f(factory(t), t)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** + * FlatMaps f over this RDD where f takes an additional parameter of type A. This + * additional parameter is produced by a factory method T => A which is called + * on each invocation of f. This factory method is produced by the factoryBuilder, + * an instance of which is constructed in each partition from the partition index + * and a seed value of type B. + */ + def flatMapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( + f:(A, T) => Seq[U], + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val factory = factoryBuilder(index, factorySeed) + iter.flatMap(t => f(factory(t), t)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + + /** + * Filters this RDD with p, where p takes an additional parameter of type A. This + * additional parameter is produced by a factory method T => A which is called + * on each invocation of p. This factory method is produced by the factoryBuilder, + * an instance of which is constructed in each partition from the partition index + * and a seed value of type B. + */ + def filterWith[A: ClassManifest, B: ClassManifest]( + p:(A, T) => Boolean, + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[T] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { + val factory = factoryBuilder(index, factorySeed) + iter.filter(t => p(factory(t), t)) + } + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + } + /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of @@ -404,7 +461,7 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 9739ba869b..ced8170300 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -178,4 +178,70 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(prunedData.size === 1) assert(prunedData(0) === 10) } + + test("mapWith") { + sc = new SparkContext("local", "test") + val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) + val randoms = ones.mapWith( + (random: Double, t: Int) => random * t, + (index: Int, seed: Int) => { + val prng = new java.util.Random(index + seed) + (_ => prng.nextDouble)}, + 42). + collect() + val prn42_3 = { + val prng42 = new java.util.Random(42) + prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() + } + val prn43_3 = { + val prng43 = new java.util.Random(43) + prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() + } + assert(randoms(2) === prn42_3) + assert(randoms(5) === prn43_3) + } + + test("flatMapWith") { + sc = new SparkContext("local", "test") + val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) + val randoms = ones.flatMapWith( + (random: Double, t: Int) => Seq(random * t, random * t * 10), + (index: Int, seed: Int) => { + val prng = new java.util.Random(index + seed) + (_ => prng.nextDouble)}, + 42). + collect() + val prn42_3 = { + val prng42 = new java.util.Random(42) + prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() + } + val prn43_3 = { + val prng43 = new java.util.Random(43) + prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() + } + assert(randoms(5) === prn42_3 * 10) + assert(randoms(11) === prn43_3 * 10) + } + + test("filterWith") { + import java.util.Random + sc = new SparkContext("local", "test") + val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) + val sample = ints.filterWith( + (random: Int, t: Int) => random == 0, + (index: Int, seed: Int) => { + val prng = new Random(index + seed) + (_ => prng.nextInt(3))}, + 42). + collect() + val checkSample = { + val prng42 = new Random(42) + val prng43 = new Random(43) + Array(1, 2, 3, 4, 5, 6).filter{i => + if (i < 4) 0 == prng42.nextInt(3) + else 0 == prng43.nextInt(3)} + } + assert(sample.size === checkSample.size) + for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) + } } -- cgit v1.2.3 From d046d8ad329b7d5812ecc5f9a4661fab5625b1b7 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 5 Mar 2013 00:48:13 -0800 Subject: whitespace formatting --- core/src/main/scala/spark/RDD.scala | 20 ++++++++++---------- core/src/test/scala/spark/RDDSuite.scala | 12 ++++++------ 2 files changed, 16 insertions(+), 16 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index de791598a4..cc206782d0 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -380,7 +380,7 @@ abstract class RDD[T: ClassManifest]( val factory = factoryBuilder(index, factorySeed) iter.map(t => f(factory(t), t)) } - new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } /** @@ -391,14 +391,14 @@ abstract class RDD[T: ClassManifest]( * and a seed value of type B. */ def flatMapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( - f:(A, T) => Seq[U], - factoryBuilder: (Int, B) => (T => A), - factorySeed: B, - preservesPartitioning: Boolean = false): RDD[U] = { - def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val factory = factoryBuilder(index, factorySeed) - iter.flatMap(t => f(factory(t), t)) - } + f:(A, T) => Seq[U], + factoryBuilder: (Int, B) => (T => A), + factorySeed: B, + preservesPartitioning: Boolean = false): RDD[U] = { + def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { + val factory = factoryBuilder(index, factorySeed) + iter.flatMap(t => f(factory(t), t)) + } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } @@ -418,7 +418,7 @@ abstract class RDD[T: ClassManifest]( val factory = factoryBuilder(index, factorySeed) iter.filter(t => p(factory(t), t)) } - new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } /** diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ced8170300..b549677469 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -185,8 +185,8 @@ class RDDSuite extends FunSuite with LocalSparkContext { val randoms = ones.mapWith( (random: Double, t: Int) => random * t, (index: Int, seed: Int) => { - val prng = new java.util.Random(index + seed) - (_ => prng.nextDouble)}, + val prng = new java.util.Random(index + seed) + (_ => prng.nextDouble)}, 42). collect() val prn42_3 = { @@ -230,16 +230,16 @@ class RDDSuite extends FunSuite with LocalSparkContext { val sample = ints.filterWith( (random: Int, t: Int) => random == 0, (index: Int, seed: Int) => { - val prng = new Random(index + seed) - (_ => prng.nextInt(3))}, + val prng = new Random(index + seed) + (_ => prng.nextInt(3))}, 42). collect() val checkSample = { val prng42 = new Random(42) val prng43 = new Random(43) Array(1, 2, 3, 4, 5, 6).filter{i => - if (i < 4) 0 == prng42.nextInt(3) - else 0 == prng43.nextInt(3)} + if (i < 4) 0 == prng42.nextInt(3) + else 0 == prng43.nextInt(3)} } assert(sample.size === checkSample.size) for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) -- cgit v1.2.3 From 5ff0810b11e95e3b48d88ae744fdeaf7c117186d Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 5 Mar 2013 12:25:44 -0800 Subject: refactor mapWith, flatMapWith and filterWith to each use two parameter lists --- core/src/main/scala/spark/RDD.scala | 12 ++++++------ core/src/test/scala/spark/RDDSuite.scala | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index cc206782d0..0a901a251d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -372,10 +372,10 @@ abstract class RDD[T: ClassManifest]( * and a seed value of type B. */ def mapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( - f:(A, T) => U, factoryBuilder: (Int, B) => (T => A), factorySeed: B, - preservesPartitioning: Boolean = false): RDD[U] = { + preservesPartitioning: Boolean = false) + (f:(A, T) => U): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { val factory = factoryBuilder(index, factorySeed) iter.map(t => f(factory(t), t)) @@ -391,10 +391,10 @@ abstract class RDD[T: ClassManifest]( * and a seed value of type B. */ def flatMapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( - f:(A, T) => Seq[U], factoryBuilder: (Int, B) => (T => A), factorySeed: B, - preservesPartitioning: Boolean = false): RDD[U] = { + preservesPartitioning: Boolean = false) + (f:(A, T) => Seq[U]): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { val factory = factoryBuilder(index, factorySeed) iter.flatMap(t => f(factory(t), t)) @@ -410,10 +410,10 @@ abstract class RDD[T: ClassManifest]( * and a seed value of type B. */ def filterWith[A: ClassManifest, B: ClassManifest]( - p:(A, T) => Boolean, factoryBuilder: (Int, B) => (T => A), factorySeed: B, - preservesPartitioning: Boolean = false): RDD[T] = { + preservesPartitioning: Boolean = false) + (p:(A, T) => Boolean): RDD[T] = { def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { val factory = factoryBuilder(index, factorySeed) iter.filter(t => p(factory(t), t)) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index b549677469..2a182e0d6c 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -183,11 +183,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.mapWith( - (random: Double, t: Int) => random * t, (index: Int, seed: Int) => { val prng = new java.util.Random(index + seed) (_ => prng.nextDouble)}, - 42). + 42) + {(random: Double, t: Int) => random * t}. collect() val prn42_3 = { val prng42 = new java.util.Random(42) @@ -205,11 +205,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( - (random: Double, t: Int) => Seq(random * t, random * t * 10), (index: Int, seed: Int) => { val prng = new java.util.Random(index + seed) (_ => prng.nextDouble)}, - 42). + 42) + {(random: Double, t: Int) => Seq(random * t, random * t * 10)}. collect() val prn42_3 = { val prng42 = new java.util.Random(42) @@ -228,11 +228,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) val sample = ints.filterWith( - (random: Int, t: Int) => random == 0, (index: Int, seed: Int) => { val prng = new Random(index + seed) (_ => prng.nextInt(3))}, - 42). + 42) + {(random: Int, t: Int) => random == 0}. collect() val checkSample = { val prng42 = new Random(42) -- cgit v1.2.3 From 1289e7176bc1ad4eb3a7089acb59bcb8220eddab Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sun, 10 Mar 2013 22:27:13 -0700 Subject: refactored _With API and added foreachPartition --- core/src/main/scala/spark/RDD.scala | 79 +++++++++++++++++--------------- core/src/test/scala/spark/RDDSuite.scala | 34 ++++++-------- 2 files changed, 57 insertions(+), 56 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 0a901a251d..2ad11bc604 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -365,60 +365,59 @@ abstract class RDD[T: ClassManifest]( new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) /** - * Maps f over this RDD where f takes an additional parameter of type A. This - * additional parameter is produced by a factory method T => A which is called - * on each invocation of f. This factory method is produced by the factoryBuilder, - * an instance of which is constructed in each partition from the partition index - * and a seed value of type B. - */ - def mapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( - factoryBuilder: (Int, B) => (T => A), - factorySeed: B, - preservesPartitioning: Boolean = false) + * Maps f over this RDD where, f takes an additional parameter of type A. This + * additional parameter is produced by constructorOfA, which is called in each + * partition with the index of that partition. + */ + def mapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false) (f:(A, T) => U): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val factory = factoryBuilder(index, factorySeed) - iter.map(t => f(factory(t), t)) + val a = constructorOfA(index) + iter.map(t => f(a, t)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } - /** - * FlatMaps f over this RDD where f takes an additional parameter of type A. This - * additional parameter is produced by a factory method T => A which is called - * on each invocation of f. This factory method is produced by the factoryBuilder, - * an instance of which is constructed in each partition from the partition index - * and a seed value of type B. + /** + * FlatMaps f over this RDD, where f takes an additional parameter of type A. This + * additional parameter is produced by constructorOfA, which is called in each + * partition with the index of that partition. */ - def flatMapWith[A: ClassManifest, B: ClassManifest, U: ClassManifest]( - factoryBuilder: (Int, B) => (T => A), - factorySeed: B, - preservesPartitioning: Boolean = false) + def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false) (f:(A, T) => Seq[U]): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val factory = factoryBuilder(index, factorySeed) - iter.flatMap(t => f(factory(t), t)) + val a = constructorOfA(index) + iter.flatMap(t => f(a, t)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } + /** + * Applies f to each element of this RDD, where f takes an additional parameter of type A. + * This additional parameter is produced by constructorOfA, which is called in each + * partition with the index of that partition. + */ + def foreachWith[A: ClassManifest](constructorOfA: Int => A) + (f:(A, T) => Unit) { + def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { + val a = constructorOfA(index) + iter.map(t => {f(a, t); t}) + } + (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {}) + } + /** * Filters this RDD with p, where p takes an additional parameter of type A. This - * additional parameter is produced by a factory method T => A which is called - * on each invocation of p. This factory method is produced by the factoryBuilder, - * an instance of which is constructed in each partition from the partition index - * and a seed value of type B. - */ - def filterWith[A: ClassManifest, B: ClassManifest]( - factoryBuilder: (Int, B) => (T => A), - factorySeed: B, - preservesPartitioning: Boolean = false) + * additional parameter is produced by constructorOfA, which is called in each + * partition with the index of that partition. + */ + def filterWith[A: ClassManifest](constructorOfA: Int => A) (p:(A, T) => Boolean): RDD[T] = { def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { - val factory = factoryBuilder(index, factorySeed) - iter.filter(t => p(factory(t), t)) + val a = constructorOfA(index) + iter.filter(t => p(a, t)) } - new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) + new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true) } /** @@ -439,6 +438,14 @@ abstract class RDD[T: ClassManifest]( sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } + /** + * Applies a function f to each partition of this RDD. + */ + def foreachPartition(f: Iterator[T] => Unit) { + val cleanF = sc.clean(f) + sc.runJob(this, (iter: Iterator[T]) => f(iter)) + } + /** * Return an array that contains all of the elements in this RDD. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 2a182e0d6c..d260191dd7 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -180,21 +180,18 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("mapWith") { + import java.util.Random sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.mapWith( - (index: Int, seed: Int) => { - val prng = new java.util.Random(index + seed) - (_ => prng.nextDouble)}, - 42) - {(random: Double, t: Int) => random * t}. - collect() + (index: Int) => new Random(index + 42)) + {(prng: Random, t: Int) => prng.nextDouble * t}.collect() val prn42_3 = { - val prng42 = new java.util.Random(42) + val prng42 = new Random(42) prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() } val prn43_3 = { - val prng43 = new java.util.Random(43) + val prng43 = new Random(43) prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() } assert(randoms(2) === prn42_3) @@ -202,21 +199,21 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("flatMapWith") { + import java.util.Random sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( - (index: Int, seed: Int) => { - val prng = new java.util.Random(index + seed) - (_ => prng.nextDouble)}, - 42) - {(random: Double, t: Int) => Seq(random * t, random * t * 10)}. + (index: Int) => new Random(index + 42)) + {(prng: Random, t: Int) => { + val random = prng.nextDouble() + Seq(random * t, random * t * 10)}}. collect() val prn42_3 = { - val prng42 = new java.util.Random(42) + val prng42 = new Random(42) prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() } val prn43_3 = { - val prng43 = new java.util.Random(43) + val prng43 = new Random(43) prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble() } assert(randoms(5) === prn42_3 * 10) @@ -228,11 +225,8 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) val sample = ints.filterWith( - (index: Int, seed: Int) => { - val prng = new Random(index + seed) - (_ => prng.nextInt(3))}, - 42) - {(random: Int, t: Int) => random == 0}. + (index: Int) => new Random(index + 42)) + {(prng: Random, t: Int) => prng.nextInt(3) == 0}. collect() val checkSample = { val prng42 = new Random(42) -- cgit v1.2.3 From 562893bea3b57fcb7fad3bec65c3bc012e0e85d2 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sun, 10 Mar 2013 22:43:08 -0700 Subject: deleted excess curly braces --- core/src/test/scala/spark/RDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index d260191dd7..3d925798b7 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -204,9 +204,9 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => { + {(prng: Random, t: Int) => val random = prng.nextDouble() - Seq(random * t, random * t * 10)}}. + Seq(random * t, random * t * 10)}. collect() val prn42_3 = { val prng42 = new Random(42) -- cgit v1.2.3 From 80fc8c82ed3d7c16aa722bdf8ba60e1f2a763c29 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 16 Mar 2013 12:16:29 -0700 Subject: _With[Matei] --- core/src/main/scala/spark/RDD.scala | 34 ++++++++++++++++---------------- core/src/test/scala/spark/RDDSuite.scala | 6 +++--- 2 files changed, 20 insertions(+), 20 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index b6defdd418..96d5c0b80c 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -369,25 +369,25 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def mapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false) - (f:(A, T) => U): RDD[U] = { + def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) + (f:(T, A) => U): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val a = constructorOfA(index) - iter.map(t => f(a, t)) + val a = constructA(index) + iter.map(t => f(t, a)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } - /** + /** * FlatMaps f over this RDD, where f takes an additional parameter of type A. This * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false) - (f:(A, T) => Seq[U]): RDD[U] = { + def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) + (f:(T, A) => Seq[U]): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val a = constructorOfA(index) - iter.flatMap(t => f(a, t)) + val a = constructA(index) + iter.flatMap(t => f(t, a)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } @@ -397,11 +397,11 @@ abstract class RDD[T: ClassManifest]( * This additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def foreachWith[A: ClassManifest](constructorOfA: Int => A) - (f:(A, T) => Unit) { + def foreachWith[A: ClassManifest](constructA: Int => A) + (f:(T, A) => Unit) { def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { - val a = constructorOfA(index) - iter.map(t => {f(a, t); t}) + val a = constructA(index) + iter.map(t => {f(t, a); t}) } (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {}) } @@ -411,11 +411,11 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def filterWith[A: ClassManifest](constructorOfA: Int => A) - (p:(A, T) => Boolean): RDD[T] = { + def filterWith[A: ClassManifest](constructA: Int => A) + (p:(T, A) => Boolean): RDD[T] = { def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { - val a = constructorOfA(index) - iter.filter(t => p(a, t)) + val a = constructA(index) + iter.filter(t => p(t, a)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true) } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 3d925798b7..33281d3c82 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -185,7 +185,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.mapWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => prng.nextDouble * t}.collect() + {(t: Int, prng: Random) => prng.nextDouble * t}.collect() val prn42_3 = { val prng42 = new Random(42) prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() @@ -204,7 +204,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => + {(t: Int, prng: Random) => val random = prng.nextDouble() Seq(random * t, random * t * 10)}. collect() @@ -226,7 +226,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) val sample = ints.filterWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => prng.nextInt(3) == 0}. + {(t: Int, prng: Random) => prng.nextInt(3) == 0}. collect() val checkSample = { val prng42 = new Random(42) -- cgit v1.2.3 From 9784fc1fcd88dc11dda6cf5a6e44e49c49f1143a Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 16 Mar 2013 15:25:02 -0700 Subject: fix wayward comma in doc comment --- core/src/main/scala/spark/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 96d5c0b80c..dd54c6a123 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -365,7 +365,7 @@ abstract class RDD[T: ClassManifest]( new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning) /** - * Maps f over this RDD where, f takes an additional parameter of type A. This + * Maps f over this RDD, where f takes an additional parameter of type A. This * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ -- cgit v1.2.3 From ab33e27cc9769157c62d940124b5091301bdc69a Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 16 Mar 2013 15:29:15 -0700 Subject: constructorOfA -> constructA in doc comments --- core/src/main/scala/spark/RDD.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index dd54c6a123..ed39732f13 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest]( /** * Maps f over this RDD, where f takes an additional parameter of type A. This - * additional parameter is produced by constructorOfA, which is called in each + * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) @@ -380,7 +380,7 @@ abstract class RDD[T: ClassManifest]( /** * FlatMaps f over this RDD, where f takes an additional parameter of type A. This - * additional parameter is produced by constructorOfA, which is called in each + * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) @@ -394,7 +394,7 @@ abstract class RDD[T: ClassManifest]( /** * Applies f to each element of this RDD, where f takes an additional parameter of type A. - * This additional parameter is produced by constructorOfA, which is called in each + * This additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def foreachWith[A: ClassManifest](constructA: Int => A) @@ -408,7 +408,7 @@ abstract class RDD[T: ClassManifest]( /** * Filters this RDD with p, where p takes an additional parameter of type A. This - * additional parameter is produced by constructorOfA, which is called in each + * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def filterWith[A: ClassManifest](constructA: Int => A) -- cgit v1.2.3