aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-02-26 23:15:33 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-02-26 23:15:33 -0800
commit309367c4772e1793bf7fe58fc2ed2fca2f7bf657 (patch)
tree205e0e88e30f4a823e01779f68a3df5a63f8d1af /core
parentdc24aecd8fc8b5d9b19c10e8301dbe1107412c8a (diff)
downloadspark-309367c4772e1793bf7fe58fc2ed2fca2f7bf657.tar.gz
spark-309367c4772e1793bf7fe58fc2ed2fca2f7bf657.tar.bz2
spark-309367c4772e1793bf7fe58fc2ed2fca2f7bf657.zip
Initial work towards new RDD design
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/DaemonThreadFactory.scala14
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala258
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala7
-rw-r--r--core/src/main/scala/spark/RDD.scala57
-rw-r--r--core/src/main/scala/spark/Scheduler.scala3
-rw-r--r--core/src/main/scala/spark/SparkContext.scala10
6 files changed, 303 insertions, 46 deletions
diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala
new file mode 100644
index 0000000000..cb30cb2ac8
--- /dev/null
+++ b/core/src/main/scala/spark/DaemonThreadFactory.scala
@@ -0,0 +1,14 @@
+package spark
+
+import java.util.concurrent.ThreadFactory
+
+/**
+ * A ThreadFactory that creates daemon threads
+ */
+private object DaemonThreadFactory extends ThreadFactory {
+ override def newThread(r: Runnable): Thread = {
+ val t = new Thread(r);
+ t.setDaemon(true)
+ return t
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 20954a1224..8518229856 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -2,6 +2,8 @@ package spark
import java.util.concurrent._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import scala.collection.mutable.Map
/**
@@ -14,28 +16,27 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
override def start() {}
override def waitForRegister() {}
-
- override def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T])
- : Array[T] = {
- val futures = new Array[Future[TaskResult[T]]](tasks.length)
-
- for (i <- 0 until tasks.length) {
- futures(i) = threadPool.submit(new Callable[TaskResult[T]]() {
- def call(): TaskResult[T] = {
+
+ val completionEvents = new LinkedBlockingQueue[CompletionEvent]
+
+ def submitTasks(tasks: Seq[Task[_]]) {
+ tasks.zipWithIndex.foreach { case (task, i) =>
+ threadPool.submit(new Runnable {
+ def run() {
logInfo("Running task " + i)
try {
// Serialize and deserialize the task so that accumulators are
// changed to thread-local ones; this adds a bit of unnecessary
- // overhead but matches how the Nexus Executor works
+ // overhead but matches how the Mesos Executor works
Accumulators.clear
val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
- val task = Utils.deserialize[Task[T]](
+ val task = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
- val value = task.run
+ val result: Any = task.run
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
- new TaskResult[T](value, accumUpdates)
+ completionEvents.put(CompletionEvent(task, true, result, accumUpdates))
} catch {
case e: Exception => {
// TODO: Do something nicer here
@@ -47,26 +48,233 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
}
})
}
-
- val taskResults = futures.map(_.get)
- for (result <- taskResults)
- Accumulators.add(currentThread, result.accumUpdates)
- return taskResults.map(_.value).toArray(m)
}
override def stop() {}
override def numCores() = threads
+
+ var nextStageId = 0
+
+ def newStageId() = {
+ var res = nextStageId
+ nextStageId += 1
+ res
+ }
+
+ val idToStage = new HashMap[Int, Stage]
+
+ val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage]
+
+ val cacheLocs = new HashMap[RDD[_], Array[List[String]]]
+
+ def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
+ cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil))
+ }
+
+ def addCacheLoc(rdd: RDD[_], partition: Int, host: String) {
+ val locs = getCacheLocs(rdd)
+ locs(partition) = host :: locs(partition)
+ }
+
+ def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) {
+ val locs = getCacheLocs(rdd)
+ locs(partition) -= host
+ }
+
+ def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
+ shuffleToMapStage.get(shuf) match {
+ case Some(stage) => stage
+ case None =>
+ val stage = newStage(
+ true, shuf.rdd, shuf.spec.partitioner.numPartitions)
+ shuffleToMapStage(shuf) = stage
+ stage
+ }
+ }
+
+ def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
+ val id = newStageId()
+ val parents = getParentStages(rdd)
+ val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
+ idToStage(id) = stage
+ stage
+ }
+
+ def getParentStages(rdd: RDD[_]): List[Stage] = {
+ val parents = new HashSet[Stage]
+ val visited = new HashSet[RDD[_]]
+ def visit(r: RDD[_]) {
+ if (!visited(r)) {
+ visited += r
+ for (dep <- r.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_,_] =>
+ parents += getShuffleMapStage(shufDep)
+ case _ =>
+ visit(dep.rdd)
+ }
+ }
+ }
+ }
+ visit(rdd)
+ parents.toList
+ }
+
+ def getMissingParentStages(stage: Stage): List[Stage] = {
+ val missing = new HashSet[Stage]
+ val visited = new HashSet[RDD[_]]
+ def visit(rdd: RDD[_]) {
+ if (!visited(rdd)) {
+ visited += rdd
+ val locs = getCacheLocs(rdd)
+ for (p <- 0 until rdd.splits.size) {
+ if (locs(p) == Nil) {
+ for (dep <- rdd.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_,_] =>
+ val stage = getShuffleMapStage(shufDep)
+ if (!stage.isAvailable)
+ missing += stage
+ case narrowDep: NarrowDependency[_] =>
+ visit(narrowDep.rdd)
+ }
+ }
+ }
+ }
+ }
+ }
+ visit(stage.rdd)
+ missing.toList
+ }
+
+ override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ val numOutputParts: Int = rdd.splits.size
+ val finalStage = newStage(false, rdd, numOutputParts)
+ val results = new Array[U](numOutputParts)
+ val finished = new Array[Boolean](numOutputParts)
+ var numFinished = 0
+
+ val waiting = new HashSet[Stage]
+ val running = new HashSet[Stage]
+ val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
+
+ def submitStage(stage: Stage) {
+ if (!waiting(stage) && !running(stage)) {
+ val missing = getMissingParentStages(stage)
+ if (missing == Nil) {
+ logInfo("Submitting " + stage + ", which has no missing parents")
+ submitMissingTasks(stage)
+ running += stage
+ } else {
+ for (parent <- missing)
+ submitStage(parent)
+ waiting += stage
+ }
+ }
+ }
+
+ def submitMissingTasks(stage: Stage) {
+ var tasks: List[Task[_]] = Nil
+ if (stage == finalStage) {
+ for (p <- 0 until numOutputParts if (!finished(p))) {
+ val locs = getPreferredLocs(rdd, p)
+ tasks = new ResultTask(rdd, func, p, locs) :: tasks
+ }
+ }
+ submitTasks(tasks)
+ }
+
+ submitStage(finalStage)
+
+ while (numFinished != numOutputParts) {
+ val evt = completionEvents.take()
+ if (evt.successful) {
+ evt.task match {
+ case rt: ResultTask[_, _] =>
+ results(rt.partition) = evt.result.asInstanceOf[U]
+ finished(rt.partition) = true
+ numFinished += 1
+ // case smt: ShuffleMapTask
+ }
+ } else {
+ throw new SparkException("Task failed: " + evt.task)
+ // TODO: Kill the running job
+ }
+ }
+
+ return results
+ }
+
+ def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+ // If the partition is cached, return the cache locations
+ val cached = getCacheLocs(rdd)(partition)
+ if (cached != Nil) {
+ return cached
+ }
+ // If the RDD has some placement preferences (as is the case for input RDDs), get those
+ val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
+ if (rddPrefs != Nil) {
+ return rddPrefs
+ }
+ // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
+ // that has any placement preferences. Ideally we would choose based on transfer sizes,
+ // but this will do for now.
+ rdd.dependencies.foreach(_ match {
+ case n: NarrowDependency[_] =>
+ for (inPart <- n.getParents(partition)) {
+ val locs = getPreferredLocs(n.rdd, inPart)
+ if (locs != Nil)
+ return locs;
+ }
+ })
+ return Nil
+ }
}
+case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
-/**
- * A ThreadFactory that creates daemon threads
- */
-private object DaemonThreadFactory extends ThreadFactory {
- override def newThread(r: Runnable): Thread = {
- val t = new Thread(r);
- t.setDaemon(true)
- return t
+class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
+extends Task[U] {
+ val split = rdd.splits(partition)
+
+ override def run: U = {
+ func(rdd.iterator(split))
}
+
+ override def preferredLocations: Seq[String] = locs
+
+ override def toString = "ResultTask " + partition
}
+
+class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
+ val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
+ var numAvailableOutputs = 0
+
+ def isAvailable: Boolean = {
+ if (parents.size == 0 && !isShuffleMap)
+ true
+ else
+ numAvailableOutputs == numPartitions
+ }
+
+ def addOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ outputLocs(partition) = host :: prevList
+ if (prevList == Nil)
+ numAvailableOutputs += 1
+ }
+
+ def removeOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ val newList = prevList - host
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil)
+ numAvailableOutputs -= 1
+ }
+
+ override def toString = "Stage " + id
+
+ override def hashCode(): Int = id
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 6a592d13c3..35ad552775 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -105,7 +105,7 @@ extends MScheduler with spark.Scheduler with Logging
* The primary means to submit a job to the scheduler. Given a list of tasks,
* runs them and returns an array of the results.
*/
- override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
+ def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
waitForRegister()
val jobId = newJobId()
val myJob = new SimpleJob(this, tasks, jobId)
@@ -291,4 +291,9 @@ extends MScheduler with spark.Scheduler with Logging
// Serialize the map as an array of (String, String) pairs
return Utils.serialize(props.toArray)
}
+
+ override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ new Array[U](0)
+ }
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 39f2dc4458..391b54f4eb 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -12,25 +12,51 @@ import SparkContext._
import mesos._
+abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
+
+abstract class NarrowDependency[T](rdd: RDD[T])
+extends Dependency(rdd, false) {
+ def getParents(outputPartition: Int): Seq[Int]
+}
+
+class ShuffleDependency[K, V, C](
+ rdd: RDD[(K, V)],
+ val spec: ShuffleSpec[K, V, C]
+) extends Dependency(rdd, true)
+
+class ShuffleSpec[K, V, C] (
+ val createCombiner: V => C,
+ val mergeValue: (C, V) => C,
+ val mergeCombiners: (C, C) => C,
+ val partitioner: Partitioner[K]
+)
+
+abstract class Partitioner[K] {
+ def numPartitions: Int
+ def getPartition(key: K): Int
+}
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def splits: Array[Split]
def iterator(split: Split): Iterator[T]
def preferredLocations(split: Split): Seq[String]
+
+ def dependencies: List[Dependency[_]] = Nil
+ def partitioner: Option[Partitioner[_]] = None
def taskStarted(split: Split, slot: SlaveOffer) {}
def sparkContext = sc
- def map[U: ClassManifest](f: T => U) = new MappedRDD(this, sc.clean(f))
- def filter(f: T => Boolean) = new FilteredRDD(this, sc.clean(f))
+ def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
def cache() = new CachedRDD(this)
- def sample(withReplacement: Boolean, frac: Double, seed: Int) =
+ def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, frac, seed)
- def flatMap[U: ClassManifest](f: T => Traversable[U]) =
+ def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
def foreach(f: T => Unit) {
@@ -40,8 +66,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
}
def collect(): Array[T] = {
- val tasks = splits.map(s => new CollectTask(this, s))
- val results = sc.runTaskObjects(tasks)
+ val results = sc.scheduler.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
@@ -49,9 +74,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
- val tasks = splits.map(s => new ReduceTask(this, s, f))
+ val reducePartition: Iterator[T] => Option[T] = iter => {
+ if (iter.hasNext)
+ Some(iter.reduceLeft(f))
+ else
+ None
+ }
+ val options = sc.scheduler.runJob(this, reducePartition)
val results = new ArrayBuffer[T]
- for (option <- sc.runTaskObjects(tasks); elem <- option)
+ for (opt <- options; elem <- opt)
results += elem
if (results.size == 0)
throw new UnsupportedOperationException("empty collection")
@@ -77,20 +108,20 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
}
def count(): Long = {
- try {
+ try {
map(x => 1L).reduce(_+_)
} catch {
case e: UnsupportedOperationException => 0L // No elements in RDD
}
}
- def union(other: RDD[T]) = new UnionRDD(sc, Array(this, other))
+ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
- def ++(other: RDD[T]) = this.union(other)
+ def ++(other: RDD[T]): RDD[T] = this.union(other)
- def splitRdd() = new SplitRDD(this)
+ def splitRdd(): RDD[Array[T]] = new SplitRDD(this)
- def cartesian[U: ClassManifest](other: RDD[U]) =
+ def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
new CartesianRDD(sc, this, other)
def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala
index b9f3128c82..fbcbb3e935 100644
--- a/core/src/main/scala/spark/Scheduler.scala
+++ b/core/src/main/scala/spark/Scheduler.scala
@@ -4,7 +4,8 @@ package spark
private trait Scheduler {
def start()
def waitForRegister()
- def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T]
+ //def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T]
+ def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]): Array[U]
def stop()
def numCores(): Int
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bf70b5fcb1..7c30587928 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -14,7 +14,7 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil)
extends Logging {
- private var scheduler: Scheduler = {
+ private[spark] var scheduler: Scheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
master match {
@@ -126,19 +126,17 @@ extends Logging {
None
}
- // Submit an array of tasks (passed as functions) to the scheduler
- def runTasks[T: ClassManifest](tasks: Array[() => T]): Array[T] = {
- runTaskObjects(tasks.map(f => new FunctionTask(f)))
- }
-
// Run an array of spark.Task objects
private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]])
: Array[T] = {
+ return null;
+ /*
logInfo("Running " + tasks.length + " tasks in parallel")
val start = System.nanoTime
val result = scheduler.runTasks(tasks.toArray)
logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
return result
+ */
}
// Clean a closure to make it ready to serialized and send to tasks