diff options
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 27 |
1 files changed, 23 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 47bdb09986..12e2f4f902 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -251,12 +251,16 @@ extends RDD[Array[T]](prev.context) { def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) - def mapValues[U](f: V => U): RDD[(K, U)] = - { + def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } + def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = { + val cleanF = self.context.clean(f) + new FlatMappedValuesRDD(self, cleanF) + } + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { val part = self.partitioner match { case Some(p) => p @@ -291,6 +295,21 @@ extends RDD[(K, U)](prev.context) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} override val partitioner = prev.partitioner -}
\ No newline at end of file + override def compute(split: Split) = + prev.iterator(split).map{case (k, v) => (k, f(v))} +} + +class FlatMappedValuesRDD[K, V, U]( + prev: RDD[(K, V)], f: V => Traversable[U]) +extends RDD[(K, U)](prev.context) { + override def splits = prev.splits + override def preferredLocations(split: Split) = prev.preferredLocations(split) + override val dependencies = List(new OneToOneDependency(prev)) + override val partitioner = prev.partitioner + override def compute(split: Split) = { + prev.iterator(split).toStream.flatMap { + case (k, v) => f(v).map(x => (k, x)) + }.iterator + } +} |