aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-02-26 23:41:44 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-02-26 23:41:44 -0800
commit2e6023f2bf7e44f9c8d06a4d4bca7955ebd2020f (patch)
treecac3cb3978ebe4fb040eeae188ce56f077dcd70f /core
parent309367c4772e1793bf7fe58fc2ed2fca2f7bf657 (diff)
downloadspark-2e6023f2bf7e44f9c8d06a4d4bca7955ebd2020f.tar.gz
spark-2e6023f2bf7e44f9c8d06a4d4bca7955ebd2020f.tar.bz2
spark-2e6023f2bf7e44f9c8d06a4d4bca7955ebd2020f.zip
stuff
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/DAGScheduler.scala251
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala236
-rw-r--r--core/src/main/scala/spark/RDD.scala16
-rw-r--r--core/src/main/scala/spark/SparkContext.scala11
4 files changed, 276 insertions, 238 deletions
diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala
new file mode 100644
index 0000000000..5a5fc4c840
--- /dev/null
+++ b/core/src/main/scala/spark/DAGScheduler.scala
@@ -0,0 +1,251 @@
+package spark
+
+import java.util.concurrent._
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.Map
+
+/**
+ * A Scheduler subclass that implements stage-oriented scheduling. It computes
+ * a DAG of stages for each job, keeps track of which RDDs and stage outputs
+ * are materialized, and computes a minimal schedule to run the job. Subclasses
+ * only need to implement the code to send a task to the cluster and to report
+ * failures from it (the submitTasks method, and code to add completionEvents).
+ */
+private abstract class DAGScheduler extends Scheduler with Logging {
+ // Must be implemented by subclasses to start running a set of tasks
+ def submitTasks(tasks: Seq[Task[_]]): Unit
+
+ // Must be called by subclasses to report task completions or failures
+ def taskEnded(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) {
+ completionEvents.put(CompletionEvent(task, successful, result, accumUpdates))
+ }
+
+ private val completionEvents = new LinkedBlockingQueue[CompletionEvent]
+
+ var nextStageId = 0
+
+ def newStageId() = {
+ var res = nextStageId
+ nextStageId += 1
+ res
+ }
+
+ val idToStage = new HashMap[Int, Stage]
+
+ val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage]
+
+ val cacheLocs = new HashMap[RDD[_], Array[List[String]]]
+
+ def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
+ cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil))
+ }
+
+ def addCacheLoc(rdd: RDD[_], partition: Int, host: String) {
+ val locs = getCacheLocs(rdd)
+ locs(partition) = host :: locs(partition)
+ }
+
+ def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) {
+ val locs = getCacheLocs(rdd)
+ locs(partition) -= host
+ }
+
+ def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
+ shuffleToMapStage.get(shuf) match {
+ case Some(stage) => stage
+ case None =>
+ val stage = newStage(
+ true, shuf.rdd, shuf.spec.partitioner.numPartitions)
+ shuffleToMapStage(shuf) = stage
+ stage
+ }
+ }
+
+ def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
+ val id = newStageId()
+ val parents = getParentStages(rdd)
+ val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
+ idToStage(id) = stage
+ stage
+ }
+
+ def getParentStages(rdd: RDD[_]): List[Stage] = {
+ val parents = new HashSet[Stage]
+ val visited = new HashSet[RDD[_]]
+ def visit(r: RDD[_]) {
+ if (!visited(r)) {
+ visited += r
+ for (dep <- r.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_,_] =>
+ parents += getShuffleMapStage(shufDep)
+ case _ =>
+ visit(dep.rdd)
+ }
+ }
+ }
+ }
+ visit(rdd)
+ parents.toList
+ }
+
+ def getMissingParentStages(stage: Stage): List[Stage] = {
+ val missing = new HashSet[Stage]
+ val visited = new HashSet[RDD[_]]
+ def visit(rdd: RDD[_]) {
+ if (!visited(rdd)) {
+ visited += rdd
+ val locs = getCacheLocs(rdd)
+ for (p <- 0 until rdd.splits.size) {
+ if (locs(p) == Nil) {
+ for (dep <- rdd.dependencies) {
+ dep match {
+ case shufDep: ShuffleDependency[_,_,_] =>
+ val stage = getShuffleMapStage(shufDep)
+ if (!stage.isAvailable)
+ missing += stage
+ case narrowDep: NarrowDependency[_] =>
+ visit(narrowDep.rdd)
+ }
+ }
+ }
+ }
+ }
+ }
+ visit(stage.rdd)
+ missing.toList
+ }
+
+ override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ val numOutputParts: Int = rdd.splits.size
+ val finalStage = newStage(false, rdd, numOutputParts)
+ val results = new Array[U](numOutputParts)
+ val finished = new Array[Boolean](numOutputParts)
+ var numFinished = 0
+
+ val waiting = new HashSet[Stage]
+ val running = new HashSet[Stage]
+ val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
+
+ def submitStage(stage: Stage) {
+ if (!waiting(stage) && !running(stage)) {
+ val missing = getMissingParentStages(stage)
+ if (missing == Nil) {
+ logInfo("Submitting " + stage + ", which has no missing parents")
+ submitMissingTasks(stage)
+ running += stage
+ } else {
+ for (parent <- missing)
+ submitStage(parent)
+ waiting += stage
+ }
+ }
+ }
+
+ def submitMissingTasks(stage: Stage) {
+ var tasks: List[Task[_]] = Nil
+ if (stage == finalStage) {
+ for (p <- 0 until numOutputParts if (!finished(p))) {
+ val locs = getPreferredLocs(rdd, p)
+ tasks = new ResultTask(rdd, func, p, locs) :: tasks
+ }
+ }
+ submitTasks(tasks)
+ }
+
+ submitStage(finalStage)
+
+ while (numFinished != numOutputParts) {
+ val evt = completionEvents.take()
+ if (evt.successful) {
+ Accumulators.add(currentThread, evt.accumUpdates)
+ evt.task match {
+ case rt: ResultTask[_, _] =>
+ results(rt.partition) = evt.result.asInstanceOf[U]
+ finished(rt.partition) = true
+ numFinished += 1
+ // case smt: ShuffleMapTask
+ }
+ } else {
+ throw new SparkException("Task failed: " + evt.task)
+ // TODO: Kill the running job
+ }
+ }
+
+ return results
+ }
+
+ def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+ // If the partition is cached, return the cache locations
+ val cached = getCacheLocs(rdd)(partition)
+ if (cached != Nil) {
+ return cached
+ }
+ // If the RDD has some placement preferences (as is the case for input RDDs), get those
+ val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
+ if (rddPrefs != Nil) {
+ return rddPrefs
+ }
+ // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
+ // that has any placement preferences. Ideally we would choose based on transfer sizes,
+ // but this will do for now.
+ rdd.dependencies.foreach(_ match {
+ case n: NarrowDependency[_] =>
+ for (inPart <- n.getParents(partition)) {
+ val locs = getPreferredLocs(n.rdd, inPart)
+ if (locs != Nil)
+ return locs;
+ }
+ })
+ return Nil
+ }
+}
+
+case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
+
+class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
+ val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
+ var numAvailableOutputs = 0
+
+ def isAvailable: Boolean = {
+ if (parents.size == 0 && !isShuffleMap)
+ true
+ else
+ numAvailableOutputs == numPartitions
+ }
+
+ def addOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ outputLocs(partition) = host :: prevList
+ if (prevList == Nil)
+ numAvailableOutputs += 1
+ }
+
+ def removeOutputLoc(partition: Int, host: String) {
+ val prevList = outputLocs(partition)
+ val newList = prevList - host
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil)
+ numAvailableOutputs -= 1
+ }
+
+ override def toString = "Stage " + id
+
+ override def hashCode(): Int = id
+}
+
+class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
+extends Task[U] {
+ val split = rdd.splits(partition)
+
+ override def run: U = {
+ func(rdd.iterator(split))
+ }
+
+ override def preferredLocations: Seq[String] = locs
+
+ override def toString = "ResultTask " + partition
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index 8518229856..26fc5c9fdb 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -2,14 +2,10 @@ package spark
import java.util.concurrent._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Map
-
/**
* A simple Scheduler implementation that runs tasks locally in a thread pool.
*/
-private class LocalScheduler(threads: Int) extends Scheduler with Logging {
+private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
var threadPool: ExecutorService =
Executors.newFixedThreadPool(threads, DaemonThreadFactory)
@@ -17,9 +13,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
override def waitForRegister() {}
- val completionEvents = new LinkedBlockingQueue[CompletionEvent]
-
- def submitTasks(tasks: Seq[Task[_]]) {
+ override def submitTasks(tasks: Seq[Task[_]]) {
tasks.zipWithIndex.foreach { case (task, i) =>
threadPool.submit(new Runnable {
def run() {
@@ -36,7 +30,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
val result: Any = task.run
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
- completionEvents.put(CompletionEvent(task, true, result, accumUpdates))
+ taskEnded(task, true, result, accumUpdates)
} catch {
case e: Exception => {
// TODO: Do something nicer here
@@ -53,228 +47,4 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
override def stop() {}
override def numCores() = threads
-
- var nextStageId = 0
-
- def newStageId() = {
- var res = nextStageId
- nextStageId += 1
- res
- }
-
- val idToStage = new HashMap[Int, Stage]
-
- val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage]
-
- val cacheLocs = new HashMap[RDD[_], Array[List[String]]]
-
- def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
- cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil))
- }
-
- def addCacheLoc(rdd: RDD[_], partition: Int, host: String) {
- val locs = getCacheLocs(rdd)
- locs(partition) = host :: locs(partition)
- }
-
- def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) {
- val locs = getCacheLocs(rdd)
- locs(partition) -= host
- }
-
- def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
- shuffleToMapStage.get(shuf) match {
- case Some(stage) => stage
- case None =>
- val stage = newStage(
- true, shuf.rdd, shuf.spec.partitioner.numPartitions)
- shuffleToMapStage(shuf) = stage
- stage
- }
- }
-
- def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
- val id = newStageId()
- val parents = getParentStages(rdd)
- val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
- idToStage(id) = stage
- stage
- }
-
- def getParentStages(rdd: RDD[_]): List[Stage] = {
- val parents = new HashSet[Stage]
- val visited = new HashSet[RDD[_]]
- def visit(r: RDD[_]) {
- if (!visited(r)) {
- visited += r
- for (dep <- r.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
- parents += getShuffleMapStage(shufDep)
- case _ =>
- visit(dep.rdd)
- }
- }
- }
- }
- visit(rdd)
- parents.toList
- }
-
- def getMissingParentStages(stage: Stage): List[Stage] = {
- val missing = new HashSet[Stage]
- val visited = new HashSet[RDD[_]]
- def visit(rdd: RDD[_]) {
- if (!visited(rdd)) {
- visited += rdd
- val locs = getCacheLocs(rdd)
- for (p <- 0 until rdd.splits.size) {
- if (locs(p) == Nil) {
- for (dep <- rdd.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
- val stage = getShuffleMapStage(shufDep)
- if (!stage.isAvailable)
- missing += stage
- case narrowDep: NarrowDependency[_] =>
- visit(narrowDep.rdd)
- }
- }
- }
- }
- }
- }
- visit(stage.rdd)
- missing.toList
- }
-
- override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
- : Array[U] = {
- val numOutputParts: Int = rdd.splits.size
- val finalStage = newStage(false, rdd, numOutputParts)
- val results = new Array[U](numOutputParts)
- val finished = new Array[Boolean](numOutputParts)
- var numFinished = 0
-
- val waiting = new HashSet[Stage]
- val running = new HashSet[Stage]
- val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
-
- def submitStage(stage: Stage) {
- if (!waiting(stage) && !running(stage)) {
- val missing = getMissingParentStages(stage)
- if (missing == Nil) {
- logInfo("Submitting " + stage + ", which has no missing parents")
- submitMissingTasks(stage)
- running += stage
- } else {
- for (parent <- missing)
- submitStage(parent)
- waiting += stage
- }
- }
- }
-
- def submitMissingTasks(stage: Stage) {
- var tasks: List[Task[_]] = Nil
- if (stage == finalStage) {
- for (p <- 0 until numOutputParts if (!finished(p))) {
- val locs = getPreferredLocs(rdd, p)
- tasks = new ResultTask(rdd, func, p, locs) :: tasks
- }
- }
- submitTasks(tasks)
- }
-
- submitStage(finalStage)
-
- while (numFinished != numOutputParts) {
- val evt = completionEvents.take()
- if (evt.successful) {
- evt.task match {
- case rt: ResultTask[_, _] =>
- results(rt.partition) = evt.result.asInstanceOf[U]
- finished(rt.partition) = true
- numFinished += 1
- // case smt: ShuffleMapTask
- }
- } else {
- throw new SparkException("Task failed: " + evt.task)
- // TODO: Kill the running job
- }
- }
-
- return results
- }
-
- def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
- // If the partition is cached, return the cache locations
- val cached = getCacheLocs(rdd)(partition)
- if (cached != Nil) {
- return cached
- }
- // If the RDD has some placement preferences (as is the case for input RDDs), get those
- val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
- if (rddPrefs != Nil) {
- return rddPrefs
- }
- // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
- // that has any placement preferences. Ideally we would choose based on transfer sizes,
- // but this will do for now.
- rdd.dependencies.foreach(_ match {
- case n: NarrowDependency[_] =>
- for (inPart <- n.getParents(partition)) {
- val locs = getPreferredLocs(n.rdd, inPart)
- if (locs != Nil)
- return locs;
- }
- })
- return Nil
- }
-}
-
-case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
-
-class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
-extends Task[U] {
- val split = rdd.splits(partition)
-
- override def run: U = {
- func(rdd.iterator(split))
- }
-
- override def preferredLocations: Seq[String] = locs
-
- override def toString = "ResultTask " + partition
-}
-
-class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
- val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
- var numAvailableOutputs = 0
-
- def isAvailable: Boolean = {
- if (parents.size == 0 && !isShuffleMap)
- true
- else
- numAvailableOutputs == numPartitions
- }
-
- def addOutputLoc(partition: Int, host: String) {
- val prevList = outputLocs(partition)
- outputLocs(partition) = host :: prevList
- if (prevList == Nil)
- numAvailableOutputs += 1
- }
-
- def removeOutputLoc(partition: Int, host: String) {
- val prevList = outputLocs(partition)
- val newList = prevList - host
- outputLocs(partition) = newList
- if (prevList != Nil && newList == Nil)
- numAvailableOutputs -= 1
- }
-
- override def toString = "Stage " + id
-
- override def hashCode(): Int = id
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 391b54f4eb..37baed6e4c 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -12,6 +12,7 @@ import SparkContext._
import mesos._
+@serializable
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
abstract class NarrowDependency[T](rdd: RDD[T])
@@ -19,11 +20,16 @@ extends Dependency(rdd, false) {
def getParents(outputPartition: Int): Seq[Int]
}
+class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
+ override def getParents(partitionId: Int) = List(partitionId)
+}
+
class ShuffleDependency[K, V, C](
rdd: RDD[(K, V)],
val spec: ShuffleSpec[K, V, C]
) extends Dependency(rdd, true)
+@serializable
class ShuffleSpec[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
@@ -31,6 +37,7 @@ class ShuffleSpec[K, V, C] (
val partitioner: Partitioner[K]
)
+@serializable
abstract class Partitioner[K] {
def numPartitions: Int
def getPartition(key: K): Int
@@ -42,8 +49,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def iterator(split: Split): Iterator[T]
def preferredLocations(split: Split): Seq[String]
- def dependencies: List[Dependency[_]] = Nil
- def partitioner: Option[Partitioner[_]] = None
+ val dependencies: List[Dependency[_]] = Nil
+ val partitioner: Option[Partitioner[_]] = None
def taskStarted(split: Split, slot: SlaveOffer) {}
@@ -66,7 +73,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
}
def collect(): Array[T] = {
- val results = sc.scheduler.runJob(this, (iter: Iterator[T]) => iter.toArray)
+ val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
@@ -80,7 +87,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
else
None
}
- val options = sc.scheduler.runJob(this, reducePartition)
+ val options = sc.runJob(this, reducePartition)
val results = new ArrayBuffer[T]
for (opt <- options; elem <- opt)
results += elem
@@ -177,6 +184,7 @@ extends RDD[U](prev.sparkContext) {
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = prev.iterator(split).map(f)
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
+ override val dependencies = List(new OneToOneDependency(prev))
}
class FilteredRDD[T: ClassManifest](
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 7c30587928..04bd86180b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -14,7 +14,7 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil)
extends Logging {
- private[spark] var scheduler: Scheduler = {
+ private var scheduler: Scheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
master match {
@@ -139,6 +139,15 @@ extends Logging {
*/
}
+ private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
+ : Array[U] = {
+ logInfo("Starting job...")
+ val start = System.nanoTime
+ val result = scheduler.runJob(rdd, func)
+ logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
+ result
+ }
+
// Clean a closure to make it ready to serialized and send to tasks
// (removes unreferenced variables in $outer's, updates REPL variables)
private[spark] def clean[F <: AnyRef](f: F): F = {