aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-04-12 19:51:58 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-04-12 19:51:58 -0700
commit94ba95bcb2c9ad99a4eacdf503fa763a44e157ad (patch)
treef10e19db4c478797c4ccf8d8b99619c67c728905
parentd840fa8d0c376cb80d3d1141b40a9caeb606c074 (diff)
downloadspark-94ba95bcb2c9ad99a4eacdf503fa763a44e157ad.tar.gz
spark-94ba95bcb2c9ad99a4eacdf503fa763a44e157ad.tar.bz2
spark-94ba95bcb2c9ad99a4eacdf503fa763a44e157ad.zip
Added flatMapValues
-rw-r--r--core/src/main/scala/spark/RDD.scala27
1 files changed, 23 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 47bdb09986..12e2f4f902 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -251,12 +251,16 @@ extends RDD[Array[T]](prev.context) {
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
- def mapValues[U](f: V => U): RDD[(K, U)] =
- {
+ def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
}
+ def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
+ val cleanF = self.context.clean(f)
+ new FlatMappedValuesRDD(self, cleanF)
+ }
+
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
val part = self.partitioner match {
case Some(p) => p
@@ -291,6 +295,21 @@ extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
override val partitioner = prev.partitioner
-} \ No newline at end of file
+ override def compute(split: Split) =
+ prev.iterator(split).map{case (k, v) => (k, f(v))}
+}
+
+class FlatMappedValuesRDD[K, V, U](
+ prev: RDD[(K, V)], f: V => Traversable[U])
+extends RDD[(K, U)](prev.context) {
+ override def splits = prev.splits
+ override def preferredLocations(split: Split) = prev.preferredLocations(split)
+ override val dependencies = List(new OneToOneDependency(prev))
+ override val partitioner = prev.partitioner
+ override def compute(split: Split) = {
+ prev.iterator(split).toStream.flatMap {
+ case (k, v) => f(v).map(x => (k, x))
+ }.iterator
+ }
+}