aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-16 14:01:12 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-08-18 19:51:07 -0700
commit4004cf775d9397efbb5945768aaf05ba682c715c (patch)
tree1ae651925f1ed7c45bce614e2b22e48e305d39bd /core
parent2a4ed10210f9ee32f472e2465094d88561c0ff18 (diff)
downloadspark-4004cf775d9397efbb5945768aaf05ba682c715c.tar.gz
spark-4004cf775d9397efbb5945768aaf05ba682c715c.tar.bz2
spark-4004cf775d9397efbb5945768aaf05ba682c715c.zip
Added some comments on threading in scheduler code
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala9
3 files changed, 35 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 2f7e6d98f8..35b31f45a7 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -32,10 +32,22 @@ import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
/**
- * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
- * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
- * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
- * and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
+ * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
+ * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
+ * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
+ * TaskScheduler implementation that runs them on the cluster.
+ *
+ * In addition to coming up with a DAG of stages, this class also determines the preferred
+ * locations to run each task on, based on the current cache status, and passes these to the
+ * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
+ * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
+ * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task
+ * a small number of times before cancelling the whole stage.
+ *
+ * THREADING: This class runs all its logic in a single thread executing the run() method, to which
+ * events are submitted using a synchonized queue (eventQueue). The public API methods, such as
+ * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
+ * should be private.
*/
private[spark]
class DAGScheduler(
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index e88edc5b2a..679d899b47 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -33,7 +33,17 @@ import java.util.{TimerTask, Timer}
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
- * start(), then submit task sets through the runTasks method.
+ * initialize() and start(), then submit task sets through the runTasks method.
+ *
+ * This class can work with multiple types of clusters by acting through a SchedulerBackend.
+ * It handles common logic, like determining a scheduling order across jobs, waking up to launch
+ * speculative tasks, etc.
+ *
+ * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * threads, so it needs locks in public API methods to maintain its state. In addition, some
+ * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
+ * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
+ * we are holding a lock on ourselves.
*/
private[spark] class ClusterScheduler(val sc: SparkContext)
extends TaskScheduler
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 91de25254c..1d57732f5d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -37,7 +37,14 @@ import spark.TaskResultTooBigFailure
/**
- * Schedules the tasks within a single TaskSet in the ClusterScheduler.
+ * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
+ * the status of each task, retries tasks if they fail (up to a limited number of times), and
+ * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
+ * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
+ * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the
+ * ClusterScheduler (e.g. its event handlers). It should not be called from other threads.
*/
private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet)
extends TaskSetManager with Logging {