aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-13 14:32:50 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-13 14:32:50 -0800
commit68e5ad58b7e7e3e1b42852de8d0fdf9e9b9c1a14 (patch)
tree837719ad9bc7bb11cfc149964eeb0e65e629a942 /core/src/main/scala/org/apache
parentfb64828b0b573f3a77938592f168af7aa3a2b6c5 (diff)
downloadspark-68e5ad58b7e7e3e1b42852de8d0fdf9e9b9c1a14.tar.gz
spark-68e5ad58b7e7e3e1b42852de8d0fdf9e9b9c1a14.tar.bz2
spark-68e5ad58b7e7e3e1b42852de8d0fdf9e9b9c1a14.zip
Extracted TaskScheduler interface.
Also changed the default maximum number of task failures to be 0 when running in local mode.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala)47
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala9
10 files changed, 57 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1850436ff2..e8ff4da475 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -159,26 +159,26 @@ class SparkContext(
master match {
case "local" =>
- val scheduler = new TaskScheduler(this)
+ val scheduler = new ClusterScheduler(this, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
scheduler
case LOCAL_N_REGEX(threads) =>
- val scheduler = new TaskScheduler(this)
+ val scheduler = new ClusterScheduler(this, isLocal = true)
val backend = new LocalBackend(scheduler, threads.toInt)
scheduler.initialize(backend)
scheduler
case SPARK_REGEX(sparkUrl) =>
- val scheduler = new TaskScheduler(this)
+ val scheduler = new ClusterScheduler(this)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
scheduler
case SIMR_REGEX(simrUrl) =>
- val scheduler = new TaskScheduler(this)
+ val scheduler = new ClusterScheduler(this)
val backend = new SimrSchedulerBackend(scheduler, this, simrUrl)
scheduler.initialize(backend)
scheduler
@@ -192,7 +192,7 @@ class SparkContext(
memoryPerSlaveInt, SparkContext.executorMemoryRequested))
}
- val scheduler = new TaskScheduler(this)
+ val scheduler = new ClusterScheduler(this, isLocal = true)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val masterUrls = localCluster.start()
@@ -207,7 +207,7 @@ class SparkContext(
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
- cons.newInstance(this).asInstanceOf[TaskScheduler]
+ cons.newInstance(this).asInstanceOf[ClusterScheduler]
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
@@ -221,7 +221,7 @@ class SparkContext(
case MESOS_REGEX(mesosUrl) =>
MesosNativeLibrary.load()
- val scheduler = new TaskScheduler(this)
+ val scheduler = new ClusterScheduler(this)
val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, this, mesosUrl, appName)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
index b4ec695ece..c7d1295215 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
@@ -30,17 +30,13 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
- * Schedules tasks for a single SparkContext. Receives a set of tasks from the DAGScheduler for
- * each stage, and is responsible for sending tasks to executors, running them, retrying if there
- * are failures, and mitigating stragglers. Returns events to the DAGScheduler.
- *
- * Clients should first call initialize() and start(), then submit task sets through the
- * runTasks method.
- *
- * This class can work with multiple types of clusters by acting through a SchedulerBackend.
+ * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
* It can also work with a local setup by using a LocalBackend and setting isLocal to true.
* It handles common logic, like determining a scheduling order across jobs, waking up to launch
* speculative tasks, etc.
+ *
+ * Clients should first call initialize() and start(), then submit task sets through the
+ * runTasks method.
*
* THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
* threads, so it needs locks in public API methods to maintain its state. In addition, some
@@ -48,7 +44,9 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
-private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = false) extends Logging {
+private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = false)
+ extends TaskScheduler with Logging {
+
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
@@ -59,6 +57,15 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
+ val MAX_TASK_FAILURES = {
+ if (isLocal) {
+ // No sense in retrying if all tasks run locally!
+ 0
+ } else {
+ System.getProperty("spark.task.maxFailures", "4").toInt
+ }
+ }
+
val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -95,7 +102,7 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
- def setDAGScheduler(dagScheduler: DAGScheduler) {
+ override def setDAGScheduler(dagScheduler: DAGScheduler) {
this.dagScheduler = dagScheduler
}
@@ -116,7 +123,7 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
def newTaskId(): Long = nextTaskId.getAndIncrement()
- def start() {
+ override def start() {
backend.start()
if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
@@ -138,11 +145,11 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
}
}
- def submitTasks(taskSet: TaskSet) {
+ override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
- val manager = new TaskSetManager(this, taskSet)
+ val manager = new TaskSetManager(this, taskSet, MAX_TASK_FAILURES)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
@@ -165,7 +172,7 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
backend.reviveOffers()
}
- def cancelTasks(stageId: Int): Unit = synchronized {
+ override def cancelTasks(stageId: Int): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
// There are two possible cases here:
@@ -351,7 +358,7 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
}
}
- def stop() {
+ override def stop() {
if (backend != null) {
backend.stop()
}
@@ -364,7 +371,7 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
Thread.sleep(5000L)
}
- def defaultParallelism() = backend.defaultParallelism()
+ override def defaultParallelism() = backend.defaultParallelism()
// Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
@@ -439,16 +446,10 @@ private[spark] class TaskScheduler(val sc: SparkContext, isLocal: Boolean = fals
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
-
- /**
- * Invoked after the system has successfully been initialized. YARN uses this to bootstrap
- * allocation of resources based on preferred locations, wait for slave registrations, etc.
- */
- def postStartHook() { }
}
-private[spark] object TaskScheduler {
+private[spark] object ClusterScheduler {
/**
* Used to balance containers across hosts.
*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 5408fa7353..a77ff35323 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
-private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskScheduler)
+private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends Logging {
private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 90b6519027..8757d7fd2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -40,19 +40,22 @@ import org.apache.spark.util.{SystemClock, Clock}
*
* THREADING: This class is designed to only be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
+ *
+ * @param sched the ClusterScheduler associated with the TaskSetManager
+ * @param taskSet the TaskSet to manage scheduling for
+ * @param maxTaskFailures if any particular task fails more than this number of times, the entire
+ * task set will be aborted
*/
private[spark] class TaskSetManager(
- sched: TaskScheduler,
+ sched: ClusterScheduler,
val taskSet: TaskSet,
+ val maxTaskFailures: Int,
clock: Clock = SystemClock)
extends Schedulable with Logging
{
// CPUs to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
- // Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
-
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
@@ -521,10 +524,10 @@ private[spark] class TaskSetManager(
addPendingTask(index)
if (state != TaskState.KILLED) {
numFailures(index) += 1
- if (numFailures(index) > MAX_TASK_FAILURES) {
+ if (numFailures(index) > maxTaskFailures) {
logError("Task %s:%d failed more than %d times; aborting job".format(
- taskSet.id, index, MAX_TASK_FAILURES))
- abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
+ taskSet.id, index, maxTaskFailures))
+ abort("Task %s:%d failed more than %d times".format(taskSet.id, index, maxTaskFailures))
}
}
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b8ac498527..f5548fc2da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -29,7 +29,7 @@ import akka.util.Duration
import akka.util.duration._
import org.apache.spark.{SparkException, Logging, TaskState}
-import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskScheduler,
+import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, ClusterScheduler,
WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
@@ -43,7 +43,7 @@ import org.apache.spark.util.Utils
* (spark.deploy.*).
*/
private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: TaskScheduler, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index a589e7456f..40fdfcddb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.ClusterScheduler
private[spark] class SimrSchedulerBackend(
- scheduler: TaskScheduler,
+ scheduler: ClusterScheduler,
sc: SparkContext,
driverFilePath: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 15c600a1ec..acf15dbc40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -22,11 +22,11 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.client.{Client, ClientListener}
import org.apache.spark.deploy.{Command, ApplicationDescription}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskScheduler}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, ClusterScheduler}
import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
- scheduler: TaskScheduler,
+ scheduler: ClusterScheduler,
sc: SparkContext,
masters: Array[String],
appName: String)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 310da0027e..226ea46cc7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -30,7 +30,7 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.ClusterScheduler
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
/**
@@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
* remove this.
*/
private[spark] class CoarseMesosSchedulerBackend(
- scheduler: TaskScheduler,
+ scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
appName: String)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index c0e99df0b6..3acad1bb46 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
- TaskDescription, TaskScheduler, WorkerOffer}
+ TaskDescription, ClusterScheduler, WorkerOffer}
import org.apache.spark.util.Utils
/**
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
* from multiple apps can run on different cores) and in time (a core can switch ownership).
*/
private[spark] class MesosSchedulerBackend(
- scheduler: TaskScheduler,
+ scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
appName: String)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 96c3a03602..3e9d31cd5e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -24,16 +24,17 @@ import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, WorkerOffer}
+import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer}
/**
- * LocalBackend sits behind a TaskScheduler and handles launching tasks on a single Executor
- * (created by the LocalBackend) running locally.
+ * LocalBackend is used when running a local version of Spark where the executor, backend, and
+ * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
+ * on a single Executor (created by the LocalBackend) running locally.
*
* THREADING: Because methods can be called both from the Executor and the TaskScheduler, and
* because the Executor class is not thread safe, all methods are synchronized.
*/
-private[spark] class LocalBackend(scheduler: TaskScheduler, private val totalCores: Int)
+private[spark] class LocalBackend(scheduler: ClusterScheduler, private val totalCores: Int)
extends SchedulerBackend with ExecutorBackend {
private var freeCores = totalCores