aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-03-16 12:16:29 -0700
committerMark Hamstra <markhamstra@gmail.com>2013-03-16 12:16:29 -0700
commit80fc8c82ed3d7c16aa722bdf8ba60e1f2a763c29 (patch)
treebcf5721c3651bd71a53cd48fe9488c8f8fd1ea3f /core
parent38454c4aedcc4b454d3470ed853d4741ca920db2 (diff)
downloadspark-80fc8c82ed3d7c16aa722bdf8ba60e1f2a763c29.tar.gz
spark-80fc8c82ed3d7c16aa722bdf8ba60e1f2a763c29.tar.bz2
spark-80fc8c82ed3d7c16aa722bdf8ba60e1f2a763c29.zip
_With[Matei]
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala34
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala6
2 files changed, 20 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b6defdd418..96d5c0b80c 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -369,25 +369,25 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructorOfA, which is called in each
* partition with the index of that partition.
*/
- def mapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false)
- (f:(A, T) => U): RDD[U] = {
+ def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => U): RDD[U] = {
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
- val a = constructorOfA(index)
- iter.map(t => f(a, t))
+ val a = constructA(index)
+ iter.map(t => f(t, a))
}
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
}
- /**
+ /**
* FlatMaps f over this RDD, where f takes an additional parameter of type A. This
* additional parameter is produced by constructorOfA, which is called in each
* partition with the index of that partition.
*/
- def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false)
- (f:(A, T) => Seq[U]): RDD[U] = {
+ def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => Seq[U]): RDD[U] = {
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
- val a = constructorOfA(index)
- iter.flatMap(t => f(a, t))
+ val a = constructA(index)
+ iter.flatMap(t => f(t, a))
}
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
}
@@ -397,11 +397,11 @@ abstract class RDD[T: ClassManifest](
* This additional parameter is produced by constructorOfA, which is called in each
* partition with the index of that partition.
*/
- def foreachWith[A: ClassManifest](constructorOfA: Int => A)
- (f:(A, T) => Unit) {
+ def foreachWith[A: ClassManifest](constructA: Int => A)
+ (f:(T, A) => Unit) {
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
- val a = constructorOfA(index)
- iter.map(t => {f(a, t); t})
+ val a = constructA(index)
+ iter.map(t => {f(t, a); t})
}
(new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
}
@@ -411,11 +411,11 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructorOfA, which is called in each
* partition with the index of that partition.
*/
- def filterWith[A: ClassManifest](constructorOfA: Int => A)
- (p:(A, T) => Boolean): RDD[T] = {
+ def filterWith[A: ClassManifest](constructA: Int => A)
+ (p:(T, A) => Boolean): RDD[T] = {
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
- val a = constructorOfA(index)
- iter.filter(t => p(a, t))
+ val a = constructA(index)
+ iter.filter(t => p(t, a))
}
new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 3d925798b7..33281d3c82 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -185,7 +185,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
val randoms = ones.mapWith(
(index: Int) => new Random(index + 42))
- {(prng: Random, t: Int) => prng.nextDouble * t}.collect()
+ {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
val prn42_3 = {
val prng42 = new Random(42)
prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
@@ -204,7 +204,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
val randoms = ones.flatMapWith(
(index: Int) => new Random(index + 42))
- {(prng: Random, t: Int) =>
+ {(t: Int, prng: Random) =>
val random = prng.nextDouble()
Seq(random * t, random * t * 10)}.
collect()
@@ -226,7 +226,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
val sample = ints.filterWith(
(index: Int) => new Random(index + 42))
- {(prng: Random, t: Int) => prng.nextInt(3) == 0}.
+ {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
collect()
val checkSample = {
val prng42 = new Random(42)