aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-03-06 19:27:03 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-03-06 19:27:03 -0800
commit04c2d6a60cb0cdce51363aaf2713c4df4f19a594 (patch)
tree4c71e5e5abdeff702fd9ebf85f2b1f24cd3ae177 /core
parent0fb691dd28ffab6b7aa7dcb3520243f893ee76a8 (diff)
downloadspark-04c2d6a60cb0cdce51363aaf2713c4df4f19a594.tar.gz
spark-04c2d6a60cb0cdce51363aaf2713c4df4f19a594.tar.bz2
spark-04c2d6a60cb0cdce51363aaf2713c4df4f19a594.zip
stuff
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CoGroupedRDD.scala108
-rw-r--r--core/src/main/scala/spark/RDD.scala42
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala3
3 files changed, 152 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala
new file mode 100644
index 0000000000..ac84033f06
--- /dev/null
+++ b/core/src/main/scala/spark/CoGroupedRDD.scala
@@ -0,0 +1,108 @@
+package spark
+
+import java.net.URL
+import java.io.EOFException
+import java.io.ObjectInputStream
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+@serializable
+sealed trait CoGroupSplitDep
+case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
+case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
+
+@serializable
+class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep])
+extends Split {
+ override val index = idx
+ override def hashCode(): Int = idx
+}
+
+@serializable
+class CoGroupAggregator[K] extends Aggregator[K, Any, ArrayBuffer[Any]] (
+ { x => ArrayBuffer(x) },
+ { (b, x) => b += x },
+ { (b1, b2) => b1 ++ b2 }
+)
+
+class CoGroupedRDD[K](rdds: Seq[RDD[(K, _)]], part: Partitioner[K])
+extends RDD[(K, Seq[Seq[_]])](rdds.first.context) {
+ val aggr = new CoGroupAggregator[K]
+
+ override val dependencies = {
+ val deps = new ArrayBuffer[Dependency[_]]
+ for ((rdd, index) <- rdds.zipWithIndex) {
+ if (rdd.partitioner == Some(part)) {
+ deps += new OneToOneDependency(rdd)
+ } else {
+ deps += new ShuffleDependency[K, Any, ArrayBuffer[Any]](
+ context.newShuffleId, rdd, aggr, part)
+ }
+ }
+ deps.toList
+ }
+
+ @transient val splits_ : Array[Split] = {
+ val firstRdd = rdds.first
+ val array = new Array[Split](part.numPartitions)
+ for (i <- 0 until array.size) {
+ array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
+ dependencies(j) match {
+ case s: ShuffleDependency[_, _, _] =>
+ new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
+ case _ =>
+ new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep
+ }
+ }.toList)
+ }
+ array
+ }
+
+ override def splits = splits_
+
+ override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]])
+
+ override def preferredLocations(s: Split) = Nil
+
+ override def compute(s: Split): Iterator[(K, Seq[Seq[_]])] = {
+ val split = s.asInstanceOf[CoGroupSplit]
+ val map = new HashMap[K, Seq[ArrayBuffer[Any]]]
+ def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
+ map.getOrElseUpdate(k, Array.fill(rdds.size)(new ArrayBuffer[Any]))
+ }
+ for ((dep, index) <- split.deps.zipWithIndex) dep match {
+ case NarrowCoGroupSplitDep(rdd, itsSplit) => {
+ // Read them from the parent
+ for ((k: K, v) <- rdd.iterator(itsSplit)) {
+ getSeq(k)(index) += v
+ }
+ }
+ case ShuffleCoGroupSplitDep(shuffleId) => {
+ // Read map outputs of shuffle
+ val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
+ val serverUris = MapOutputTracker.getServerUris(shuffleId)
+ for ((serverUri, index) <- serverUris.zipWithIndex) {
+ splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index
+ }
+ for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
+ for (i <- inputIds) {
+ val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, index)
+ val inputStream = new ObjectInputStream(new URL(url).openStream())
+ try {
+ while (true) {
+ val (k, vs) = inputStream.readObject().asInstanceOf[(K, Seq[Any])]
+ val mySeq = getSeq(k)
+ for (v <- vs)
+ mySeq(index) += v
+ }
+ } catch {
+ case e: EOFException => {}
+ }
+ inputStream.close()
+ }
+ }
+ }
+ }
+ map.iterator
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index df044bd6cf..4d9a62a89c 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -246,4 +246,46 @@ extends RDD[Array[T]](prev.context) {
def numCores = self.context.numCores
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+
+ def mapValues[U](f: V => U): RDD[(K, U)] =
+ {
+ val cleanF = self.context.clean(f)
+ new MappedValuesRDD(self, cleanF)
+ }
+
+ /*
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ if (self.partitioner != None) {
+ val part = self.partitoner.get
+ if (other.partitioner != None && other.partitioner.get == part) {
+ // Can do a partition-wise cogroup
+ return new PartitionWiseGroupedRDD(self, other)
+ }
+ }
+
+ val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
+ val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
+ (vs ++ ws).groupByKey(numSplits).flatMap {
+ case (k, seq) => {
+ val vbuf = new ArrayBuffer[V]
+ val wbuf = new ArrayBuffer[W]
+ seq.foreach(_ match {
+ case Left(v) => vbuf += v
+ case Right(w) => wbuf += w
+ })
+ for (v <- vbuf; w <- wbuf) yield (k, (v, w))
+ }
+ }
+ }
+ */
}
+
+class MappedValuesRDD[K, V, U](
+ prev: RDD[(K, V)], f: V => U)
+extends RDD[(K, U)](prev.context) {
+ override def splits = prev.splits
+ override def preferredLocations(split: Split) = prev.preferredLocations(split)
+ override val dependencies = List(new OneToOneDependency(prev))
+ override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
+ override val partitioner = prev.partitioner
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 826957a469..26ccce0e83 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -16,7 +16,8 @@ class ShuffledRDD[K, V, C](
aggregator: Aggregator[K, V, C],
part : Partitioner[K])
extends RDD[(K, C)](parent.context) {
- override val partitioner = Some(part)
+ //override val partitioner = Some(part)
+ override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]])
@transient val splits_ =
Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))