aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-13 15:46:41 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-13 17:12:14 -0800
commit46f9c6b858cf9737b7d46b22b75bfc847244331b (patch)
tree0cf52d1772e0ee97732ebe2f70267837b28b1f70 /core/src/main/scala/org/apache
parent150615a31e0c2a5112c37cca62dd80dba8a12fab (diff)
downloadspark-46f9c6b858cf9737b7d46b22b75bfc847244331b.tar.gz
spark-46f9c6b858cf9737b7d46b22b75bfc847244331b.tar.bz2
spark-46f9c6b858cf9737b7d46b22b75bfc847244331b.zip
Fixed naming issues and added back ability to specify max task failures.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala56
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
7 files changed, 80 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 10db2fa7e7..06bea0c535 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -156,6 +156,8 @@ class SparkContext(
private[spark] var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
+ // Regular expression for local[N, maxRetries], used in tests with failing tasks
+ val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
@@ -165,19 +167,28 @@ class SparkContext(
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
+ // When running locally, don't try to re-execute tasks on failure.
+ val MAX_LOCAL_TASK_FAILURES = 0
+
master match {
case "local" =>
- val scheduler = new ClusterScheduler(this, isLocal = true)
+ val scheduler = new ClusterScheduler(this, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
scheduler
case LOCAL_N_REGEX(threads) =>
- val scheduler = new ClusterScheduler(this, isLocal = true)
+ val scheduler = new ClusterScheduler(this, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threads.toInt)
scheduler.initialize(backend)
scheduler
+ case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
+ val scheduler = new ClusterScheduler(this, maxFailures.toInt, isLocal = true)
+ val backend = new LocalBackend(scheduler, threads.toInt)
+ scheduler.initialize(backend)
+ scheduler
+
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
@@ -200,7 +211,7 @@ class SparkContext(
memoryPerSlaveInt, SparkContext.executorMemoryRequested))
}
- val scheduler = new ClusterScheduler(this, isLocal = true)
+ val scheduler = new ClusterScheduler(this)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val masterUrls = localCluster.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
index c5d7ca0481..37d554715d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala
@@ -46,8 +46,10 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
-private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = false)
- extends TaskScheduler with Logging {
+private[spark] class ClusterScheduler(
+ val sc: SparkContext,
+ val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+ isLocal: Boolean = false) extends TaskScheduler with Logging {
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
@@ -59,15 +61,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = f
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
- val MAX_TASK_FAILURES = {
- if (isLocal) {
- // No sense in retrying if all tasks run locally!
- 0
- } else {
- System.getProperty("spark.task.maxFailures", "4").toInt
- }
- }
-
val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -142,7 +135,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = f
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
- val manager = new TaskSetManager(this, taskSet, MAX_TASK_FAILURES)
+ val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
@@ -345,7 +338,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = f
// No task sets are active but we still got an error. Just exit since this
// must mean the error is during registration.
// It might be good to do something smarter here in the future.
- logError("Exiting due to error from task scheduler: " + message)
+ logError("Exiting due to error from cluster scheduler: " + message)
System.exit(1)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 1f0839a0e1..89aa098664 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkContext
/**
* A backend interface for scheduling systems that allows plugging in different ones under
- * TaskScheduler. We assume a Mesos-like model where the application gets resource offers as
+ * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
new file mode 100644
index 0000000000..17b6d97e90
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
+/**
+ * Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler.
+ * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
+ * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
+ * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
+ * them, retrying if there are failures, and mitigating stragglers. They return events to the
+ * DAGScheduler.
+ */
+private[spark] trait TaskScheduler {
+
+ def rootPool: Pool
+
+ def schedulingMode: SchedulingMode
+
+ def start(): Unit
+
+ // Invoked after system has successfully initialized (typically in spark context).
+ // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
+ def postStartHook() { }
+
+ // Disconnect from the cluster.
+ def stop(): Unit
+
+ // Submit a sequence of tasks to run.
+ def submitTasks(taskSet: TaskSet): Unit
+
+ // Cancel a stage.
+ def cancelTasks(stageId: Int)
+
+ // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
+ def setDAGScheduler(dagScheduler: DAGScheduler): Unit
+
+ // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
+ def defaultParallelism(): Int
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 8757d7fd2a..bc35e53220 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.{SystemClock, Clock}
/**
- * Schedules the tasks within a single TaskSet in the TaskScheduler. This class keeps track of
+ * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
* each task, retries tasks if they fail (up to a limited number of times), and
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3bb715e7d0..3af02b42b2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -29,7 +29,7 @@ import akka.util.Duration
import akka.util.duration._
import org.apache.spark.{SparkException, Logging, TaskState}
-import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, ClusterScheduler,
+import org.apache.spark.scheduler.{ClusterScheduler, SchedulerBackend, SlaveLost, TaskDescription,
WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 3acad1bb46..773b980c53 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -209,7 +209,7 @@ private[spark] class MesosSchedulerBackend(
getResource(offer.getResourcesList, "cpus").toInt)
}
- // Call into the TaskScheduler
+ // Call into the ClusterScheduler
val taskLists = scheduler.resourceOffers(offerableWorkers)
// Build a list of Mesos tasks for each slave