diff options
Diffstat (limited to 'core/src/main/scala/org/apache')
7 files changed, 80 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 10db2fa7e7..06bea0c535 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -156,6 +156,8 @@ class SparkContext( private[spark] var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters @@ -165,19 +167,28 @@ class SparkContext( // Regular expression for connection to Simr cluster val SIMR_REGEX = """simr://(.*)""".r + // When running locally, don't try to re-execute tasks on failure. + val MAX_LOCAL_TASK_FAILURES = 0 + master match { case "local" => - val scheduler = new ClusterScheduler(this, isLocal = true) + val scheduler = new ClusterScheduler(this, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) scheduler case LOCAL_N_REGEX(threads) => - val scheduler = new ClusterScheduler(this, isLocal = true) + val scheduler = new ClusterScheduler(this, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, threads.toInt) scheduler.initialize(backend) scheduler + case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => + val scheduler = new ClusterScheduler(this, maxFailures.toInt, isLocal = true) + val backend = new LocalBackend(scheduler, threads.toInt) + scheduler.initialize(backend) + scheduler + case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) val masterUrls = sparkUrl.split(",").map("spark://" + _) @@ -200,7 +211,7 @@ class SparkContext( memoryPerSlaveInt, SparkContext.executorMemoryRequested)) } - val scheduler = new ClusterScheduler(this, isLocal = true) + val scheduler = new ClusterScheduler(this) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val masterUrls = localCluster.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala index c5d7ca0481..37d554715d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala @@ -46,8 +46,10 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = false) - extends TaskScheduler with Logging { +private[spark] class ClusterScheduler( + val sc: SparkContext, + val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt, + isLocal: Boolean = false) extends TaskScheduler with Logging { // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong @@ -59,15 +61,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = f // on this class. val activeTaskSets = new HashMap[String, TaskSetManager] - val MAX_TASK_FAILURES = { - if (isLocal) { - // No sense in retrying if all tasks run locally! - 0 - } else { - System.getProperty("spark.task.maxFailures", "4").toInt - } - } - val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] @@ -142,7 +135,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = f val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { - val manager = new TaskSetManager(this, taskSet, MAX_TASK_FAILURES) + val manager = new TaskSetManager(this, taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) taskSetTaskIds(taskSet.id) = new HashSet[Long]() @@ -345,7 +338,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext, isLocal: Boolean = f // No task sets are active but we still got an error. Just exit since this // must mean the error is during registration. // It might be good to do something smarter here in the future. - logError("Exiting due to error from task scheduler: " + message) + logError("Exiting due to error from cluster scheduler: " + message) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 1f0839a0e1..89aa098664 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkContext /** * A backend interface for scheduling systems that allows plugging in different ones under - * TaskScheduler. We assume a Mesos-like model where the application gets resource offers as + * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala new file mode 100644 index 0000000000..17b6d97e90 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode + +/** + * Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler. + * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks + * for a single SparkContext. These schedulers get sets of tasks submitted to them from the + * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running + * them, retrying if there are failures, and mitigating stragglers. They return events to the + * DAGScheduler. + */ +private[spark] trait TaskScheduler { + + def rootPool: Pool + + def schedulingMode: SchedulingMode + + def start(): Unit + + // Invoked after system has successfully initialized (typically in spark context). + // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc. + def postStartHook() { } + + // Disconnect from the cluster. + def stop(): Unit + + // Submit a sequence of tasks to run. + def submitTasks(taskSet: TaskSet): Unit + + // Cancel a stage. + def cancelTasks(stageId: Int) + + // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. + def setDAGScheduler(dagScheduler: DAGScheduler): Unit + + // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. + def defaultParallelism(): Int +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8757d7fd2a..bc35e53220 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.{SystemClock, Clock} /** - * Schedules the tasks within a single TaskSet in the TaskScheduler. This class keeps track of + * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3bb715e7d0..3af02b42b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -29,7 +29,7 @@ import akka.util.Duration import akka.util.duration._ import org.apache.spark.{SparkException, Logging, TaskState} -import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, ClusterScheduler, +import org.apache.spark.scheduler.{ClusterScheduler, SchedulerBackend, SlaveLost, TaskDescription, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 3acad1bb46..773b980c53 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -209,7 +209,7 @@ private[spark] class MesosSchedulerBackend( getResource(offer.getResourcesList, "cpus").toInt) } - // Call into the TaskScheduler + // Call into the ClusterScheduler val taskLists = scheduler.resourceOffers(offerableWorkers) // Build a list of Mesos tasks for each slave |