aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-02-27 14:27:12 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-02-27 14:27:12 -0800
commitf38f86d59e48d348b93a7b557bbbc43638638b6a (patch)
tree98037dd9f41f52245767cea6a0cfab66da7b32d1 /core/src
parent2e6023f2bf7e44f9c8d06a4d4bca7955ebd2020f (diff)
downloadspark-f38f86d59e48d348b93a7b557bbbc43638638b6a.tar.gz
spark-f38f86d59e48d348b93a7b557bbbc43638638b6a.tar.bz2
spark-f38f86d59e48d348b93a7b557bbbc43638638b6a.zip
More stuff
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala101
-rw-r--r--core/src/main/scala/spark/DfsShuffle.scala120
-rw-r--r--core/src/main/scala/spark/LocalFileShuffle.scala3
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/spark/NumberedSplitRDD.scala42
-rw-r--r--core/src/main/scala/spark/RDD.scala173
-rw-r--r--core/src/main/scala/spark/ResultTask.scala14
-rw-r--r--core/src/main/scala/spark/ShuffleMapTask.scala38
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/Stage.scala34
11 files changed, 276 insertions, 296 deletions
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
index 5a5fc4c840..ee3fda25a8 100644
--- a/core/src/main/scala/spark/DAGScheduler.scala
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -1,10 +1,7 @@
package spark
-import java.util.concurrent._
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Map
+import java.util.concurrent.LinkedBlockingQueue
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes
@@ -56,17 +53,15 @@ private abstract class DAGScheduler extends Scheduler with Logging {
shuffleToMapStage.get(shuf) match {
case Some(stage) => stage
case None =>
- val stage = newStage(
- true, shuf.rdd, shuf.spec.partitioner.numPartitions)
+ val stage = newStage(shuf.rdd, Some(shuf))
shuffleToMapStage(shuf) = stage
stage
}
}
- def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
+ def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = {
val id = newStageId()
- val parents = getParentStages(rdd)
- val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
+ val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd))
idToStage(id) = stage
stage
}
@@ -121,7 +116,7 @@ private abstract class DAGScheduler extends Scheduler with Logging {
override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
: Array[U] = {
val numOutputParts: Int = rdd.splits.size
- val finalStage = newStage(false, rdd, numOutputParts)
+ val finalStage = newStage(rdd, None)
val results = new Array[U](numOutputParts)
val finished = new Array[Boolean](numOutputParts)
var numFinished = 0
@@ -130,6 +125,10 @@ private abstract class DAGScheduler extends Scheduler with Logging {
val running = new HashSet[Stage]
val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
+ logInfo("Final stage: " + finalStage)
+ logInfo("Parents of final stage: " + finalStage.parents)
+ logInfo("Missing parents: " + getMissingParentStages(finalStage))
+
def submitStage(stage: Stage) {
if (!waiting(stage) && !running(stage)) {
val missing = getMissingParentStages(stage)
@@ -146,13 +145,20 @@ private abstract class DAGScheduler extends Scheduler with Logging {
}
def submitMissingTasks(stage: Stage) {
- var tasks: List[Task[_]] = Nil
+ val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
+ var tasks = ArrayBuffer[Task[_]]()
if (stage == finalStage) {
for (p <- 0 until numOutputParts if (!finished(p))) {
val locs = getPreferredLocs(rdd, p)
- tasks = new ResultTask(rdd, func, p, locs) :: tasks
+ tasks += new ResultTask(finalStage.id, rdd, func, p, locs)
+ }
+ } else {
+ for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
+ val locs = getPreferredLocs(stage.rdd, p)
+ tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
}
+ myPending ++= tasks
submitTasks(tasks)
}
@@ -161,13 +167,35 @@ private abstract class DAGScheduler extends Scheduler with Logging {
while (numFinished != numOutputParts) {
val evt = completionEvents.take()
if (evt.successful) {
+ logInfo("Completed " + evt.task)
Accumulators.add(currentThread, evt.accumUpdates)
evt.task match {
case rt: ResultTask[_, _] =>
results(rt.partition) = evt.result.asInstanceOf[U]
finished(rt.partition) = true
numFinished += 1
- // case smt: ShuffleMapTask
+ pendingTasks(finalStage) -= rt
+ case smt: ShuffleMapTask =>
+ val stage = idToStage(smt.stageId)
+ stage.addOutputLoc(smt.partition, evt.result.asInstanceOf[String])
+ val pending = pendingTasks(stage)
+ pending -= smt
+ MapOutputTracker.registerMapOutputs(
+ stage.shuffleDep.get.shuffleId,
+ stage.outputLocs.map(_.first).toArray)
+ if (pending.isEmpty) {
+ logInfo(stage + " finished; looking for newly runnable stages")
+ running -= stage
+ val newlyRunnable = new ArrayBuffer[Stage]
+ for (stage <- waiting if getMissingParentStages(stage) == Nil) {
+ newlyRunnable += stage
+ }
+ waiting --= newlyRunnable
+ running ++= newlyRunnable
+ for (stage <- newlyRunnable) {
+ submitMissingTasks(stage)
+ }
+ }
}
} else {
throw new SparkException("Task failed: " + evt.task)
@@ -199,53 +227,10 @@ private abstract class DAGScheduler extends Scheduler with Logging {
if (locs != Nil)
return locs;
}
+ case _ =>
})
return Nil
}
}
case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
-
-class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
- val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
- var numAvailableOutputs = 0
-
- def isAvailable: Boolean = {
- if (parents.size == 0 && !isShuffleMap)
- true
- else
- numAvailableOutputs == numPartitions
- }
-
- def addOutputLoc(partition: Int, host: String) {
- val prevList = outputLocs(partition)
- outputLocs(partition) = host :: prevList
- if (prevList == Nil)
- numAvailableOutputs += 1
- }
-
- def removeOutputLoc(partition: Int, host: String) {
- val prevList = outputLocs(partition)
- val newList = prevList - host
- outputLocs(partition) = newList
- if (prevList != Nil && newList == Nil)
- numAvailableOutputs -= 1
- }
-
- override def toString = "Stage " + id
-
- override def hashCode(): Int = id
-}
-
-class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
-extends Task[U] {
- val split = rdd.splits(partition)
-
- override def run: U = {
- func(rdd.iterator(split))
- }
-
- override def preferredLocations: Seq[String] = locs
-
- override def toString = "ResultTask " + partition
-} \ No newline at end of file
diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/DfsShuffle.scala
deleted file mode 100644
index 7a42bf2d06..0000000000
--- a/core/src/main/scala/spark/DfsShuffle.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-package spark
-
-import java.io.{EOFException, ObjectInputStream, ObjectOutputStream}
-import java.net.URI
-import java.util.UUID
-
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
-
-
-/**
- * A simple implementation of shuffle using a distributed file system.
- *
- * TODO: Add support for compression when spark.compress is set to true.
- */
-@serializable
-class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
- override def compute(input: RDD[(K, V)],
- numOutputSplits: Int,
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C)
- : RDD[(K, C)] =
- {
- val sc = input.sparkContext
- val dir = DfsShuffle.newTempDirectory()
- logInfo("Intermediate data directory: " + dir)
-
- val numberedSplitRdd = new NumberedSplitRDD(input)
- val numInputSplits = numberedSplitRdd.splits.size
-
- // Run a parallel foreach to write the intermediate data files
- numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => {
- val myIndex = pair._1
- val myIterator = pair._2
- val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C])
- for ((k, v) <- myIterator) {
- var bucketId = k.hashCode % numOutputSplits
- if (bucketId < 0) { // Fix bucket ID if hash code was negative
- bucketId += numOutputSplits
- }
- val bucket = buckets(bucketId)
- bucket(k) = bucket.get(k) match {
- case Some(c) => mergeValue(c, v)
- case None => createCombiner(v)
- }
- }
- val fs = DfsShuffle.getFileSystem()
- for (i <- 0 until numOutputSplits) {
- val path = new Path(dir, "%d-to-%d".format(myIndex, i))
- val out = new ObjectOutputStream(fs.create(path, true))
- buckets(i).foreach(pair => out.writeObject(pair))
- out.close()
- }
- })
-
- // Return an RDD that does each of the merges for a given partition
- val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits)
- return indexes.flatMap((myIndex: Int) => {
- val combiners = new HashMap[K, C]
- val fs = DfsShuffle.getFileSystem()
- for (i <- Utils.shuffle(0 until numInputSplits)) {
- val path = new Path(dir, "%d-to-%d".format(i, myIndex))
- val inputStream = new ObjectInputStream(fs.open(path))
- try {
- while (true) {
- val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
- combiners(k) = combiners.get(k) match {
- case Some(oldC) => mergeCombiners(oldC, c)
- case None => c
- }
- }
- } catch {
- case e: EOFException => {}
- }
- inputStream.close()
- }
- combiners
- })
- }
-}
-
-
-/**
- * Companion object of DfsShuffle; responsible for initializing a Hadoop
- * FileSystem object based on the spark.dfs property and generating names
- * for temporary directories.
- */
-object DfsShuffle {
- private var initialized = false
- private var fileSystem: FileSystem = null
-
- private def initializeIfNeeded() = synchronized {
- if (!initialized) {
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val dfs = System.getProperty("spark.dfs", "file:///")
- val conf = new Configuration()
- conf.setInt("io.file.buffer.size", bufferSize)
- conf.setInt("dfs.replication", 1)
- fileSystem = FileSystem.get(new URI(dfs), conf)
- initialized = true
- }
- }
-
- def getFileSystem(): FileSystem = {
- initializeIfNeeded()
- return fileSystem
- }
-
- def newTempDirectory(): String = {
- val fs = getFileSystem()
- val workDir = System.getProperty("spark.dfs.workdir", "/tmp")
- val uuid = UUID.randomUUID()
- val path = workDir + "/shuffle-" + uuid
- fs.mkdirs(new Path(path))
- return path
- }
-}
diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala
index 367599cfb4..fd70c54c0c 100644
--- a/core/src/main/scala/spark/LocalFileShuffle.scala
+++ b/core/src/main/scala/spark/LocalFileShuffle.scala
@@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
*
* TODO: Add support for compression when spark.compress is set to true.
*/
+/*
@serializable
class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
override def compute(input: RDD[(K, V)],
@@ -90,7 +91,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
})
}
}
-
+*/
object LocalFileShuffle extends Logging {
private var initialized = false
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 26fc5c9fdb..0287082687 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -25,12 +25,12 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
Accumulators.clear
val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
- val task = Utils.deserialize[Task[_]](
+ val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
- val result: Any = task.run
+ val result: Any = deserializedTask.run
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
- taskEnded(task, true, result, accumUpdates)
+ taskEnded(tasks(i), true, result, accumUpdates)
} catch {
case e: Exception => {
// TODO: Do something nicer here
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
new file mode 100644
index 0000000000..2c487cb627
--- /dev/null
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -0,0 +1,29 @@
+package spark
+
+import java.util.concurrent.ConcurrentHashMap
+
+object MapOutputTracker {
+ private val serverUris = new ConcurrentHashMap[Int, Array[String]]
+
+ def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) {
+ var array = serverUris.get(shuffleId)
+ if (array == null) {
+ array = Array.fill[String](numMaps)(null)
+ serverUris.put(shuffleId, array)
+ }
+ array(mapId) = serverUri
+ }
+
+ def registerMapOutputs(shuffleId: Int, locs: Array[String]) {
+ serverUris.put(shuffleId, Array[String]() ++ locs)
+ }
+
+ def getServerUris(shuffleId: Int): Array[String] = {
+ // TODO: On remote node, fetch locations from master
+ serverUris.get(shuffleId)
+ }
+
+ def getMapOutputUri(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int): String = {
+ "%s/shuffle/%s/%s/%s".format(serverUri, shuffleId, mapId, reduceId)
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/NumberedSplitRDD.scala b/core/src/main/scala/spark/NumberedSplitRDD.scala
deleted file mode 100644
index 7b12210d84..0000000000
--- a/core/src/main/scala/spark/NumberedSplitRDD.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package spark
-
-import mesos.SlaveOffer
-
-
-/**
- * An RDD that takes the splits of a parent RDD and gives them unique indexes.
- * This is useful for a variety of shuffle implementations.
- */
-class NumberedSplitRDD[T: ClassManifest](prev: RDD[T])
-extends RDD[(Int, Iterator[T])](prev.sparkContext) {
- @transient val splits_ = {
- prev.splits.zipWithIndex.map {
- case (s, i) => new NumberedSplitRDDSplit(s, i): Split
- }.toArray
- }
-
- override def splits = splits_
-
- override def preferredLocations(split: Split) = {
- val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
- prev.preferredLocations(nsplit.prev)
- }
-
- override def iterator(split: Split) = {
- val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
- Iterator((nsplit.index, prev.iterator(nsplit.prev)))
- }
-
- override def taskStarted(split: Split, slot: SlaveOffer) = {
- val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
- prev.taskStarted(nsplit.prev, slot)
- }
-}
-
-
-/**
- * A split in a NumberedSplitRDD.
- */
-class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split {
- override def getId() = "NumberedSplitRDDSplit(%d)".format(index)
-}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 37baed6e4c..9b650427c8 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,5 +1,8 @@
package spark
+import java.io.EOFException
+import java.net.URL
+import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import java.util.HashSet
import java.util.Random
@@ -25,16 +28,17 @@ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
}
class ShuffleDependency[K, V, C](
+ val shuffleId: Int,
rdd: RDD[(K, V)],
- val spec: ShuffleSpec[K, V, C]
+ val aggregator: Aggregator[K, V, C],
+ val partitioner: Partitioner[K]
) extends Dependency(rdd, true)
@serializable
-class ShuffleSpec[K, V, C] (
+class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C,
- val partitioner: Partitioner[K]
+ val mergeCombiners: (C, C) => C
)
@serializable
@@ -43,6 +47,15 @@ abstract class Partitioner[K] {
def getPartition(key: K): Int
}
+class HashPartitioner[K](partitions: Int) extends Partitioner[K] {
+ def numPartitions = partitions
+
+ def getPartition(key: K) = {
+ val mod = key.hashCode % partitions
+ if (mod < 0) mod + partitions else mod // Careful of negative hash codes
+ }
+}
+
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def splits: Array[Split]
@@ -52,25 +65,27 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
val dependencies: List[Dependency[_]] = Nil
val partitioner: Option[Partitioner[_]] = None
- def taskStarted(split: Split, slot: SlaveOffer) {}
-
def sparkContext = sc
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+
+ def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
+ new FlatMappedRDD(this, sc.clean(f))
+
+ /*
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
+
def cache() = new CachedRDD(this)
def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, frac, seed)
- def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
- new FlatMappedRDD(this, sc.clean(f))
-
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
val tasks = splits.map(s => new ForeachTask(this, s, cleanF)).toArray
sc.runTaskObjects(tasks)
}
+ */
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
@@ -97,6 +112,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
return results.reduceLeft(f)
}
+ /*
def take(num: Int): Array[T] = {
if (num == 0)
return new Array[T](0)
@@ -113,6 +129,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
+ */
def count(): Long = {
try {
@@ -126,10 +143,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def ++(other: RDD[T]): RDD[T] = this.union(other)
- def splitRdd(): RDD[Array[T]] = new SplitRDD(this)
+ //def splitRdd(): RDD[Array[T]] = new SplitRDD(this)
- def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
- new CartesianRDD(sc, this, other)
+ //def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
+ // new CartesianRDD(sc, this, other)
def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
this.map(t => (func(t), t)).groupByKey(numSplits)
@@ -138,72 +155,31 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
groupBy[K](func, sc.numCores)
}
-@serializable
-abstract class RDDTask[U: ClassManifest, T: ClassManifest](
- val rdd: RDD[T], val split: Split)
-extends Task[U] {
- override def preferredLocations() = rdd.preferredLocations(split)
- override def markStarted(slot: SlaveOffer) { rdd.taskStarted(split, slot) }
-}
-
-class ForeachTask[T: ClassManifest](
- rdd: RDD[T], split: Split, func: T => Unit)
-extends RDDTask[Unit, T](rdd, split) with Logging {
- override def run() {
- logInfo("Processing " + split)
- rdd.iterator(split).foreach(func)
- }
-}
-
-class CollectTask[T](
- rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
-extends RDDTask[Array[T], T](rdd, split) with Logging {
- override def run(): Array[T] = {
- logInfo("Processing " + split)
- rdd.iterator(split).toArray(m)
- }
-}
-
-class ReduceTask[T: ClassManifest](
- rdd: RDD[T], split: Split, f: (T, T) => T)
-extends RDDTask[Option[T], T](rdd, split) with Logging {
- override def run(): Option[T] = {
- logInfo("Processing " + split)
- val iter = rdd.iterator(split)
- if (iter.hasNext)
- Some(iter.reduceLeft(f))
- else
- None
- }
-}
-
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => U)
extends RDD[U](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
+ override val dependencies = List(new OneToOneDependency(prev))
override def iterator(split: Split) = prev.iterator(split).map(f)
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
+}
+
+class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T], f: T => Traversable[U])
+extends RDD[U](prev.sparkContext) {
+ override def splits = prev.splits
+ override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
+ override def iterator(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
}
+/*
class FilteredRDD[T: ClassManifest](
prev: RDD[T], f: T => Boolean)
extends RDD[T](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = prev.iterator(split).filter(f)
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
-}
-
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T], f: T => Traversable[U])
-extends RDD[U](prev.sparkContext) {
- override def splits = prev.splits
- override def preferredLocations(split: Split) = prev.preferredLocations(split)
- override def iterator(split: Split) =
- prev.iterator(split).toStream.flatMap(f).iterator
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
class SplitRDD[T: ClassManifest](prev: RDD[T])
@@ -211,7 +187,6 @@ extends RDD[Array[T]](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
@@ -245,8 +220,6 @@ extends RDD[T](prev.sparkContext) {
prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
}
}
-
- override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split.asInstanceOf[SeededSplit].prev, slot)
}
@@ -316,13 +289,14 @@ private object CachedRDD {
// Remembers which splits are currently being loaded (on workers)
val loading = new HashSet[String]
}
+*/
@serializable
-class UnionSplit[T: ClassManifest](rdd: RDD[T], split: Split)
+class UnionSplit[T: ClassManifest](rdd: RDD[T], index: Int, split: Split)
extends Split {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
- override def getId() = "UnionSplit(" + split.getId() + ")"
+ override def getId() = "UnionSplit(" + index + ", " + split.getId() + ")"
}
@serializable
@@ -330,8 +304,8 @@ class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
extends RDD[T](sc) {
@transient val splits_ : Array[Split] = {
val splits: Seq[Split] =
- for (rdd <- rdds; split <- rdd.splits)
- yield new UnionSplit(rdd, split)
+ for ((rdd, index) <- rdds.zipWithIndex; split <- rdd.splits)
+ yield new UnionSplit(rdd, index, split)
splits.toArray
}
@@ -344,6 +318,7 @@ extends RDD[T](sc) {
s.asInstanceOf[UnionSplit[T]].preferredLocations()
}
+/*
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
override def getId() =
"CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
@@ -376,6 +351,58 @@ extends RDD[Pair[T, U]](sc) {
rdd2.taskStarted(currSplit.s2, slot)
}
}
+*/
+
+class ShuffledRDDSplit(val id: Int) extends Split {
+ override def getId() = "ShuffleRDDSplit(" + id + ")"
+}
+
+class ShuffledRDD[K, V, C](
+ parent: RDD[(K, V)],
+ aggregator: Aggregator[K, V, C],
+ partitioner: Partitioner[K])
+extends RDD[(K, C)](parent.sparkContext) {
+ @transient val splits_ =
+ Array.tabulate[Split](partitioner.numPartitions)(i => new ShuffledRDDSplit(i))
+
+ val dep = new ShuffleDependency(sparkContext.newShuffleId, parent, aggregator, partitioner)
+
+ override def splits = splits_
+
+ override def preferredLocations(split: Split) = Nil
+
+ override def iterator(split: Split): Iterator[(K, C)] = {
+ val shuffleId = dep.shuffleId
+ val splitId = split.asInstanceOf[ShuffledRDDSplit].id
+ val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
+ val serverUris = MapOutputTracker.getServerUris(shuffleId)
+ for ((serverUri, index) <- serverUris.zipWithIndex) {
+ splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index
+ }
+ val combiners = new HashMap[K, C]
+ for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
+ for (i <- inputIds) {
+ val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId)
+ val inputStream = new ObjectInputStream(new URL(url).openStream())
+ try {
+ while (true) {
+ val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
+ combiners(k) = combiners.get(k) match {
+ case Some(oldC) => aggregator.mergeCombiners(oldC, c)
+ case None => c
+ }
+ }
+ } catch {
+ case e: EOFException => {}
+ }
+ inputStream.close()
+ }
+ }
+ combiners.iterator
+ }
+
+ override val dependencies = List(dep)
+}
@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
@@ -397,10 +424,16 @@ extends RDD[Pair[T, U]](sc) {
numSplits: Int)
: RDD[(K, C)] =
{
+ val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val partitioner = new HashPartitioner[K](numSplits)
+ new ShuffledRDD(self, aggregator, partitioner)
+ // TODO
+ /*
val shufClass = Class.forName(System.getProperty(
"spark.shuffle.class", "spark.LocalFileShuffle"))
val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]]
shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners)
+ */
}
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala
new file mode 100644
index 0000000000..3b63896175
--- /dev/null
+++ b/core/src/main/scala/spark/ResultTask.scala
@@ -0,0 +1,14 @@
+package spark
+
+class ResultTask[T, U](val stageId: Int, rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
+extends Task[U] {
+ val split = rdd.splits(partition)
+
+ override def run: U = {
+ func(rdd.iterator(split))
+ }
+
+ override def preferredLocations: Seq[String] = locs
+
+ override def toString = "ResultTask(" + stageId + ", " + partition + ")"
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala
new file mode 100644
index 0000000000..287c64a9cc
--- /dev/null
+++ b/core/src/main/scala/spark/ShuffleMapTask.scala
@@ -0,0 +1,38 @@
+package spark
+
+import java.io.FileOutputStream
+import java.io.ObjectOutputStream
+import scala.collection.mutable.HashMap
+
+
+class ShuffleMapTask(val stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String])
+extends Task[String] {
+ val split = rdd.splits(partition)
+
+ override def run: String = {
+ val numOutputSplits = dep.partitioner.numPartitions
+ val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
+ val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]]
+ val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
+ for (elem <- rdd.iterator(split)) {
+ val (k, v) = elem.asInstanceOf[(Any, Any)]
+ var bucketId = partitioner.getPartition(k)
+ val bucket = buckets(bucketId)
+ bucket(k) = bucket.get(k) match {
+ case Some(c) => aggregator.mergeValue(c, v)
+ case None => aggregator.createCombiner(v)
+ }
+ }
+ for (i <- 0 until numOutputSplits) {
+ val file = LocalFileShuffle.getOutputFile(dep.shuffleId, partition, i)
+ val out = new ObjectOutputStream(new FileOutputStream(file))
+ buckets(i).foreach(pair => out.writeObject(pair))
+ out.close()
+ }
+ return LocalFileShuffle.getServerUri
+ }
+
+ override def preferredLocations: Seq[String] = locs
+
+ override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 04bd86180b..b4799d7c08 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -89,8 +89,8 @@ extends Logging {
}
/** Build the union of a list of RDDs. */
- def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
- new UnionRDD(this, rdds)
+ //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
+ // new UnionRDD(this, rdds)
// Methods for creating shared variables
@@ -157,6 +157,14 @@ extends Logging {
// Get the number of cores available to run tasks (as reported by Scheduler)
def numCores = scheduler.numCores
+
+ private var nextShuffleId: Int = 0
+
+ private[spark] def newShuffleId(): Int = {
+ val id = nextShuffleId
+ nextShuffleId += 1
+ id
+ }
}
diff --git a/core/src/main/scala/spark/Stage.scala b/core/src/main/scala/spark/Stage.scala
new file mode 100644
index 0000000000..82b70ce60d
--- /dev/null
+++ b/core/src/main/scala/spark/Stage.scala
@@ -0,0 +1,34 @@
+package spark
+
+class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], val parents: List[Stage]) {
+ val isShuffleMap = shuffleDep != None
+ val numPartitions = rdd.splits.size
+ val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
+ var numAvailableOutputs = 0
+
+ def isAvailable: Boolean = {
+ if (parents.size == 0 && !isShuffleMap)
+ true
+ else
+ numAvailableOutputs == numPartitions
+ }
+
+ def addOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ outputLocs(partition) = host :: prevList
+ if (prevList == Nil)
+ numAvailableOutputs += 1
+ }
+
+ def removeOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ val newList = prevList - host
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil)
+ numAvailableOutputs -= 1
+ }
+
+ override def toString = "Stage " + id
+
+ override def hashCode(): Int = id
+}