aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala66
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala43
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala92
3 files changed, 191 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 9bd8a0f98d..ed39732f13 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -365,6 +365,62 @@ abstract class RDD[T: ClassManifest](
new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
/**
+ * Maps f over this RDD, where f takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => U): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val a = constructA(index)
+ iter.map(t => f(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * FlatMaps f over this RDD, where f takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(T, A) => Seq[U]): RDD[U] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+ val a = constructA(index)
+ iter.flatMap(t => f(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**
+ * Applies f to each element of this RDD, where f takes an additional parameter of type A.
+ * This additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def foreachWith[A: ClassManifest](constructA: Int => A)
+ (f:(T, A) => Unit) {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val a = constructA(index)
+ iter.map(t => {f(t, a); t})
+ }
+ (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
+ }
+
+ /**
+ * Filters this RDD with p, where p takes an additional parameter of type A. This
+ * additional parameter is produced by constructA, which is called in each
+ * partition with the index of that partition.
+ */
+ def filterWith[A: ClassManifest](constructA: Int => A)
+ (p:(T, A) => Boolean): RDD[T] = {
+ def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+ val a = constructA(index)
+ iter.filter(t => p(t, a))
+ }
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
+ }
+
+ /**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
@@ -383,6 +439,14 @@ abstract class RDD[T: ClassManifest](
}
/**
+ * Applies a function f to each partition of this RDD.
+ */
+ def foreachPartition(f: Iterator[T] => Unit) {
+ val cleanF = sc.clean(f)
+ sc.runJob(this, (iter: Iterator[T]) => f(iter))
+ }
+
+ /**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
@@ -404,7 +468,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD with the elements from `this` that are not in `other`.
- *
+ *
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 65b4621b87..9213513e80 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -2,10 +2,11 @@ package spark.rdd
import java.io.{ObjectOutputStream, IOException}
import java.util.{HashMap => JHashMap}
+
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Partition, TaskContext}
+import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -28,7 +29,8 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
-class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Partition with Serializable {
+class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
+ extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
@@ -40,7 +42,19 @@ private[spark] class CoGroupAggregator
{ (b1, b2) => b1 ++ b2 })
with Serializable
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+
+/**
+ * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
+ * tuple with the list of values for that key.
+ *
+ * @param rdds parent RDDs.
+ * @param part partitioner used to partition the shuffle output.
+ * @param mapSideCombine flag indicating whether to merge values before shuffle step.
+ */
+class CoGroupedRDD[K](
+ @transient var rdds: Seq[RDD[(K, _)]],
+ part: Partitioner,
+ val mapSideCombine: Boolean = true)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private val aggr = new CoGroupAggregator
@@ -52,8 +66,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+ if (mapSideCombine) {
+ val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
+ new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+ } else {
+ new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part)
+ }
}
}
}
@@ -82,6 +100,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
val numRdds = split.deps.size
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
+
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
val seq = map.get(k)
if (seq != null) {
@@ -92,6 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
seq
}
}
+
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
@@ -102,9 +122,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- val fetchItr = fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics)
- for ((k, vs) <- fetchItr) {
- getSeq(k)(depNum) ++= vs
+ if (mapSideCombine) {
+ // With map side combine on, for each key, the shuffle fetcher returns a list of values.
+ fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics).foreach {
+ case (key, values) => getSeq(key)(depNum) ++= values
+ }
+ } else {
+ // With map side combine off, for each key the shuffle fetcher returns a single value.
+ fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics).foreach {
+ case (key, value) => getSeq(key)(depNum) += value
+ }
}
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 9739ba869b..53635b1de6 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, PartitionPruningRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -123,6 +123,36 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("cogrouped RDDs") {
+ sc = new SparkContext("local", "test")
+ val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
+ val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2)
+
+ // Use cogroup function
+ val cogrouped = rdd1.cogroup(rdd2).collectAsMap()
+ assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1")))
+ assert(cogrouped(2) === (Seq("two"), Seq("two1")))
+ assert(cogrouped(3) === (Seq("three"), Seq()))
+
+ // Construct CoGroupedRDD directly, with map side combine enabled
+ val cogrouped1 = new CoGroupedRDD[Int](
+ Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
+ new HashPartitioner(3),
+ true).collectAsMap()
+ assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
+ assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1")))
+ assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq()))
+
+ // Construct CoGroupedRDD directly, with map side combine disabled
+ val cogrouped2 = new CoGroupedRDD[Int](
+ Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
+ new HashPartitioner(3),
+ false).collectAsMap()
+ assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
+ assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1")))
+ assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq()))
+ }
+
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
@@ -178,4 +208,64 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(prunedData.size === 1)
assert(prunedData(0) === 10)
}
+
+ test("mapWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ val randoms = ones.mapWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
+ val prn42_3 = {
+ val prng42 = new Random(42)
+ prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+ }
+ val prn43_3 = {
+ val prng43 = new Random(43)
+ prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+ }
+ assert(randoms(2) === prn42_3)
+ assert(randoms(5) === prn43_3)
+ }
+
+ test("flatMapWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ val randoms = ones.flatMapWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) =>
+ val random = prng.nextDouble()
+ Seq(random * t, random * t * 10)}.
+ collect()
+ val prn42_3 = {
+ val prng42 = new Random(42)
+ prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+ }
+ val prn43_3 = {
+ val prng43 = new Random(43)
+ prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+ }
+ assert(randoms(5) === prn42_3 * 10)
+ assert(randoms(11) === prn43_3 * 10)
+ }
+
+ test("filterWith") {
+ import java.util.Random
+ sc = new SparkContext("local", "test")
+ val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
+ val sample = ints.filterWith(
+ (index: Int) => new Random(index + 42))
+ {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
+ collect()
+ val checkSample = {
+ val prng42 = new Random(42)
+ val prng43 = new Random(43)
+ Array(1, 2, 3, 4, 5, 6).filter{i =>
+ if (i < 4) 0 == prng42.nextInt(3)
+ else 0 == prng43.nextInt(3)}
+ }
+ assert(sample.size === checkSample.size)
+ for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
+ }
}