diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-12-20 14:58:04 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-12-20 14:58:04 -0800 |
commit | 30186aa2648f90d0ad4e312d28e99c9378ea317a (patch) | |
tree | 36867f0fd22824d429f079333e51608553efc917 /core/src/main/scala/org | |
parent | c06945cfe0717006ae0d44da797a4fbb1a48954d (diff) | |
download | spark-30186aa2648f90d0ad4e312d28e99c9378ea317a.tar.gz spark-30186aa2648f90d0ad4e312d28e99c9378ea317a.tar.bz2 spark-30186aa2648f90d0ad4e312d28e99c9378ea317a.zip |
Renamed ClusterScheduler to TaskSchedulerImpl
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 20 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala) | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala | 6 |
10 files changed, 27 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 663b473e5d..ad3337d94c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1044,25 +1044,25 @@ object SparkContext { master match { case "local" => - val scheduler = new ClusterScheduler(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) scheduler case LOCAL_N_REGEX(threads) => - val scheduler = new ClusterScheduler(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, threads.toInt) scheduler.initialize(backend) scheduler case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => - val scheduler = new ClusterScheduler(sc, maxFailures.toInt, isLocal = true) + val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalBackend(scheduler, threads.toInt) scheduler.initialize(backend) scheduler case SPARK_REGEX(sparkUrl) => - val scheduler = new ClusterScheduler(sc) + val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName) scheduler.initialize(backend) @@ -1077,7 +1077,7 @@ object SparkContext { memoryPerSlaveInt, SparkContext.executorMemoryRequested)) } - val scheduler = new ClusterScheduler(sc) + val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val masterUrls = localCluster.start() @@ -1092,7 +1092,7 @@ object SparkContext { val scheduler = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) - cons.newInstance(sc).asInstanceOf[ClusterScheduler] + cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { // TODO: Enumerate the exact reasons why it can fail // But irrespective of it, it means we cannot proceed ! @@ -1108,7 +1108,7 @@ object SparkContext { val scheduler = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) - cons.newInstance(sc).asInstanceOf[ClusterScheduler] + cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { case th: Throwable => { @@ -1118,7 +1118,7 @@ object SparkContext { val backend = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") - val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext]) + val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case th: Throwable => { @@ -1131,7 +1131,7 @@ object SparkContext { case mesosUrl @ MESOS_REGEX(_) => MesosNativeLibrary.load() - val scheduler = new ClusterScheduler(sc) + val scheduler = new TaskSchedulerImpl(sc) val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { @@ -1143,7 +1143,7 @@ object SparkContext { scheduler case SIMR_REGEX(simrUrl) => - val scheduler = new ClusterScheduler(sc) + val scheduler = new TaskSchedulerImpl(sc) val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) scheduler.initialize(backend) scheduler diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 7b5543e222..89102720fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils /** * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. */ -private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) +private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) extends Logging { private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( diff --git a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1ad735bc04..7409168f7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -45,7 +45,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class ClusterScheduler( +private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt, isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -429,7 +429,7 @@ private[spark] class ClusterScheduler( } -private[spark] object ClusterScheduler { +private[spark] object TaskSchedulerImpl { /** * Used to balance containers across hosts. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 0fe413a7c4..0ac982909c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -48,7 +48,7 @@ import java.io.NotSerializableException * task set will be aborted */ private[spark] class TaskSetManager( - sched: ClusterScheduler, + sched: TaskSchedulerImpl, val taskSet: TaskSet, val maxTaskFailures: Int, clock: Clock = SystemClock) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5797783793..5c534a6f43 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -29,7 +29,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{SparkException, Logging, TaskState} import org.apache.spark.{Logging, SparkException, TaskState} -import org.apache.spark.scheduler.{ClusterScheduler, SchedulerBackend, SlaveLost, TaskDescription, +import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -43,7 +43,7 @@ import org.apache.spark.util.{AkkaUtils, Utils} * (spark.deploy.*). */ private[spark] -class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) +class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 2fbd725d75..ec3e68e970 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.scheduler.ClusterScheduler +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class SimrSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext, driverFilePath: String) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 1d38f0d956..404ce7a452 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -22,11 +22,11 @@ import scala.collection.mutable.HashMap import org.apache.spark.{Logging, SparkContext} import org.apache.spark.deploy.client.{Client, ClientListener} import org.apache.spark.deploy.{Command, ApplicationDescription} -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, ClusterScheduler} +import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} import org.apache.spark.util.Utils private[spark] class SparkDeploySchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String], appName: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 5481828111..39573fc8c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} -import org.apache.spark.scheduler.ClusterScheduler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend /** @@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend * remove this. */ private[spark] class CoarseMesosSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext, master: String, appName: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 773b980c53..6aa788c460 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas import org.apache.spark.{Logging, SparkException, SparkContext, TaskState} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, - TaskDescription, ClusterScheduler, WorkerOffer} + TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.util.Utils /** @@ -40,7 +40,7 @@ import org.apache.spark.util.Utils * from multiple apps can run on different cores) and in time (a core can switch ownership). */ private[spark] class MesosSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext, master: String, appName: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 6b5f1a5dc2..69c1c04843 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -24,7 +24,7 @@ import akka.actor.{Actor, ActorRef, Props} import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} -import org.apache.spark.scheduler.{SchedulerBackend, ClusterScheduler, WorkerOffer} +import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} private case class ReviveOffers() @@ -38,7 +38,7 @@ private case class KillTask(taskId: Long) * and the ClusterScheduler. */ private[spark] class LocalActor( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, executorBackend: LocalBackend, private val totalCores: Int) extends Actor with Logging { @@ -78,7 +78,7 @@ private[spark] class LocalActor( * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks * on a single Executor (created by the LocalBackend) running locally. */ -private[spark] class LocalBackend(scheduler: ClusterScheduler, val totalCores: Int) +private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) extends SchedulerBackend with ExecutorBackend { var localActor: ActorRef = null |