diff options
author | Reynold Xin <rxin@databricks.com> | 2016-05-24 20:55:47 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-24 20:55:47 -0700 |
commit | 14494da87bdf057d2d2f796b962a4d8bc4747d31 (patch) | |
tree | 0dac9131cf4a7f32365cf3164e57dc9bd9c2a471 /core | |
parent | f08bf587b1913c6cc8ecb34c45331cf4750961c9 (diff) | |
download | spark-14494da87bdf057d2d2f796b962a4d8bc4747d31.tar.gz spark-14494da87bdf057d2d2f796b962a4d8bc4747d31.tar.bz2 spark-14494da87bdf057d2d2f796b962a4d8bc4747d31.zip |
[SPARK-15518] Rename various scheduler backend for consistency
## What changes were proposed in this pull request?
This patch renames various scheduler backends to make them consistent:
- LocalScheduler -> LocalSchedulerBackend
- AppClient -> StandaloneAppClient
- AppClientListener -> StandaloneAppClientListener
- SparkDeploySchedulerBackend -> StandaloneSchedulerBackend
- CoarseMesosSchedulerBackend -> MesosCoarseGrainedSchedulerBackend
- MesosSchedulerBackend -> MesosFineGrainedSchedulerBackend
## How was this patch tested?
Updated test cases to reflect the name change.
Author: Reynold Xin <rxin@databricks.com>
Closes #13288 from rxin/SPARK-15518.
Diffstat (limited to 'core')
14 files changed, 123 insertions, 107 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 351024bea4..36aa3becb4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,10 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, - SparkDeploySchedulerBackend} -import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import org.apache.spark.scheduler.local.LocalBackend +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} +import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -2426,7 +2425,7 @@ object SparkContext extends Logging { master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalBackend(sc.getConf, scheduler, 1) + val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) @@ -2438,7 +2437,7 @@ object SparkContext extends Logging { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalBackend(sc.getConf, scheduler, threadCount) + val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) @@ -2448,14 +2447,14 @@ object SparkContext extends Logging { // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) - val backend = new LocalBackend(sc.getConf, scheduler, threadCount) + val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) - val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) + val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) @@ -2472,9 +2471,9 @@ object SparkContext extends Logging { val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) + val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) - backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { + backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { localCluster.stop() } (backend, scheduler) @@ -2484,9 +2483,9 @@ object SparkContext extends Logging { val scheduler = new TaskSchedulerImpl(sc) val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager) + new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager) } else { - new MesosSchedulerBackend(scheduler, sc, mesosUrl) + new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl) } scheduler.initialize(backend) (backend, scheduler) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 43b17e5d49..a9df732df9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -32,17 +32,18 @@ import org.apache.spark.rpc._ import org.apache.spark.util.{RpcUtils, ThreadUtils} /** - * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, - * an app description, and a listener for cluster events, and calls back the listener when various - * events occur. + * Interface allowing applications to speak with a Spark standalone cluster manager. + * + * Takes a master URL, an app description, and a listener for cluster events, and calls + * back the listener when various events occur. * * @param masterUrls Each url should look like spark://host:port. */ -private[spark] class AppClient( +private[spark] class StandaloneAppClient( rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, - listener: AppClientListener, + listener: StandaloneAppClientListener, conf: SparkConf) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 94506a0cbb..370b16ce42 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -24,7 +24,7 @@ package org.apache.spark.deploy.client * * Users of this API should *not* block inside the callback methods. */ -private[spark] trait AppClientListener { +private[spark] trait StandaloneAppClientListener { def connected(appId: String): Unit /** Disconnection may be a temporary state, as we fail over to a new Master. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 371fb8602f..01e85ca405 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -33,21 +33,22 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.Logging import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality +import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. - * It can also work with a local setup by using a LocalBackend and setting isLocal to true. - * It handles common logic, like determining a scheduling order across jobs, waking up to launch - * speculative tasks, etc. + * It can also work with a local setup by using a [[LocalSchedulerBackendEndpoint]] and setting + * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking + * up to launch speculative tasks, etc. * * Clients should first call initialize() and start(), then submit task sets through the * runTasks method. * - * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple + * THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple * threads, so it needs locks in public API methods to maintain its state. In addition, some - * SchedulerBackends synchronize on themselves when they want to send events here, and then + * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 85d002011d..8382fbe9dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -21,28 +21,31 @@ import java.util.concurrent.Semaphore import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} -import org.apache.spark.deploy.client.{AppClient, AppClientListener} +import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.internal.Logging import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ import org.apache.spark.util.Utils -private[spark] class SparkDeploySchedulerBackend( +/** + * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager. + */ +private[spark] class StandaloneSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String]) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with AppClientListener + with StandaloneAppClientListener with Logging { - private var client: AppClient = null + private var client: StandaloneAppClient = null private var stopping = false private val launcherBackend = new LauncherBackend() { override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) } - @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ + @volatile var shutdownCallback: StandaloneSchedulerBackend => Unit = _ @volatile private var appId: String = _ private val registrationBarrier = new Semaphore(0) @@ -100,7 +103,7 @@ private[spark] class SparkDeploySchedulerBackend( } val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) - client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) + client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 2c5be1f528..e88e4ad475 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -25,13 +25,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.{Buffer, HashMap, HashSet} -import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.{RpcEndpointAddress} +import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -43,16 +42,16 @@ import org.apache.spark.util.Utils * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable * latency. * - * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to - * remove this. + * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]], + * but it seems hard to remove this. */ -private[spark] class CoarseMesosSchedulerBackend( +private[spark] class MesosCoarseGrainedSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, master: String, securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with MScheduler + with org.apache.mesos.Scheduler with MesosSchedulerUtils { val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures @@ -149,7 +148,7 @@ private[spark] class CoarseMesosSchedulerBackend( super.start() val driver = createSchedulerDriver( master, - CoarseMesosSchedulerBackend.this, + MesosCoarseGrainedSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf, @@ -239,9 +238,10 @@ private[spark] class CoarseMesosSchedulerBackend( } } - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + override def registered( + d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { appId = frameworkId.getValue mesosExternalShuffleClient.foreach(_.init(appId)) logInfo("Registered as framework ID " + appId) @@ -252,15 +252,15 @@ private[spark] class CoarseMesosSchedulerBackend( totalCoresAcquired >= maxCores * minRegisteredRatio } - override def disconnected(d: SchedulerDriver) {} + override def disconnected(d: org.apache.mesos.SchedulerDriver) {} - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} /** * Method called by Mesos to offer resources on slaves. We respond by launching an executor, * unless we've already launched more than we wanted to. */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { if (stopCalled) { logDebug("Ignoring offers during shutdown") @@ -282,7 +282,8 @@ private[spark] class CoarseMesosSchedulerBackend( } } - private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { + private def declineUnmatchedOffers( + d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = { offers.foreach { offer => declineOffer(d, offer, Some("unmet constraints"), Some(rejectOfferDurationForUnmetConstraints)) @@ -290,7 +291,7 @@ private[spark] class CoarseMesosSchedulerBackend( } private def declineOffer( - d: SchedulerDriver, + d: org.apache.mesos.SchedulerDriver, offer: Offer, reason: Option[String] = None, refuseSeconds: Option[Long] = None): Unit = { @@ -319,7 +320,8 @@ private[spark] class CoarseMesosSchedulerBackend( * @param d SchedulerDriver * @param offers Mesos offers that match attribute constraints */ - private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { + private def handleMatchedOffers( + d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = { val tasks = buildMesosTasks(offers) for (offer <- offers) { val offerAttributes = toAttributeMap(offer.getAttributesList) @@ -440,7 +442,7 @@ private[spark] class CoarseMesosSchedulerBackend( math.min(offerCPUs, maxCores - totalCoresAcquired)) } - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue val slaveId = status.getSlaveId.getValue val state = TaskState.fromMesos(status.getState) @@ -498,7 +500,7 @@ private[spark] class CoarseMesosSchedulerBackend( } } - override def error(d: SchedulerDriver, message: String) { + override def error(d: org.apache.mesos.SchedulerDriver, message: String) { logError(s"Mesos error: $message") scheduler.error(message) } @@ -538,14 +540,15 @@ private[spark] class CoarseMesosSchedulerBackend( } } - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + override def frameworkMessage( + d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** * Called when a slave is lost or a Mesos task finished. Updates local view on * what tasks are running. It also notifies the driver that an executor was removed. */ private def executorTerminated( - d: SchedulerDriver, + d: org.apache.mesos.SchedulerDriver, slaveId: String, taskId: String, reason: String): Unit = { @@ -555,11 +558,12 @@ private[spark] class CoarseMesosSchedulerBackend( } } - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { + override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = { logInfo(s"Mesos slave lost: ${slaveId.getValue}") } - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { + override def executorLost( + d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { logInfo("Mesos executor lost: %s".format(e.getValue)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 1a94aee2ca..e08dc3b595 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -23,7 +23,6 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} -import org.apache.mesos.{Scheduler => MScheduler, _} import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} import org.apache.mesos.protobuf.ByteString @@ -38,12 +37,12 @@ import org.apache.spark.util.Utils * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks * from multiple apps can run on different cores) and in time (a core can switch ownership). */ -private[spark] class MesosSchedulerBackend( +private[spark] class MesosFineGrainedSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, master: String) extends SchedulerBackend - with MScheduler + with org.apache.mesos.Scheduler with MesosSchedulerUtils { // Stores the slave ids that has launched a Mesos executor. @@ -74,7 +73,7 @@ private[spark] class MesosSchedulerBackend( classLoader = Thread.currentThread.getContextClassLoader val driver = createSchedulerDriver( master, - MesosSchedulerBackend.this, + MesosFineGrainedSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf, @@ -175,9 +174,10 @@ private[spark] class MesosSchedulerBackend( execArgs } - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} - override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + override def registered( + d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { inClassLoader() { appId = frameworkId.getValue logInfo("Registered as framework ID " + appId) @@ -195,9 +195,9 @@ private[spark] class MesosSchedulerBackend( } } - override def disconnected(d: SchedulerDriver) {} + override def disconnected(d: org.apache.mesos.SchedulerDriver) {} - override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { val builder = new StringBuilder @@ -216,7 +216,7 @@ private[spark] class MesosSchedulerBackend( * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that * tasks are balanced across the cluster. */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { inClassLoader() { // Fail first on offers with unmet constraints val (offersMatchingConstraints, offersNotMatchingConstraints) = @@ -355,7 +355,7 @@ private[spark] class MesosSchedulerBackend( (taskInfo, finalResources.asJava) } - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { inClassLoader() { val tid = status.getTaskId.getValue.toLong val state = TaskState.fromMesos(status.getState) @@ -373,7 +373,7 @@ private[spark] class MesosSchedulerBackend( } } - override def error(d: SchedulerDriver, message: String) { + override def error(d: org.apache.mesos.SchedulerDriver, message: String) { inClassLoader() { logError("Mesos error: " + message) markErr() @@ -391,7 +391,8 @@ private[spark] class MesosSchedulerBackend( mesosDriver.reviveOffers() } - override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + override def frameworkMessage( + d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} /** * Remove executor associated with slaveId in a thread safe manner. @@ -403,7 +404,8 @@ private[spark] class MesosSchedulerBackend( } } - private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { + private def recordSlaveLost( + d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { inClassLoader() { logInfo("Mesos slave lost: " + slaveId.getValue) removeExecutor(slaveId.getValue, reason.toString) @@ -411,12 +413,12 @@ private[spark] class MesosSchedulerBackend( } } - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) { recordSlaveLost(d, slaveId, SlaveLost()) } - override def executorLost(d: SchedulerDriver, executorId: ExecutorID, - slaveId: SlaveID, status: Int) { + override def executorLost( + d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) { logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, slaveId.getValue)) recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 1b7ac172de..05b2b08944 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging /** * A collection of utility functions which can be used by both the - * MesosSchedulerBackend and the CoarseMesosSchedulerBackend. + * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]]. */ private[mesos] object MesosSchedulerBackendUtil extends Logging { /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala index 3473ef21b3..ee06588379 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala @@ -39,15 +39,15 @@ private case class KillTask(taskId: Long, interruptThread: Boolean) private case class StopExecutor() /** - * Calls to LocalBackend are all serialized through LocalEndpoint. Using an RpcEndpoint makes the - * calls on LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend - * and the TaskSchedulerImpl. + * Calls to [[LocalSchedulerBackendEndpoint]] are all serialized through LocalEndpoint. Using an + * RpcEndpoint makes the calls on [[LocalSchedulerBackendEndpoint]] asynchronous, which is necessary + * to prevent deadlock between [[LocalSchedulerBackendEndpoint]] and the [[TaskSchedulerImpl]]. */ private[spark] class LocalEndpoint( override val rpcEnv: RpcEnv, userClassPath: Seq[URL], scheduler: TaskSchedulerImpl, - executorBackend: LocalBackend, + executorBackend: LocalSchedulerBackendEndpoint, private val totalCores: Int) extends ThreadSafeRpcEndpoint with Logging { @@ -91,11 +91,11 @@ private[spark] class LocalEndpoint( } /** - * LocalBackend is used when running a local version of Spark where the executor, backend, and - * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks - * on a single Executor (created by the LocalBackend) running locally. + * Used when running a local version of Spark where the executor, backend, and master all run in + * the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single + * Executor (created by the [[LocalSchedulerBackendEndpoint]]) running locally. */ -private[spark] class LocalBackend( +private[spark] class LocalSchedulerBackendEndpoint( conf: SparkConf, scheduler: TaskSchedulerImpl, val totalCores: Int) @@ -124,7 +124,7 @@ private[spark] class LocalBackend( override def start() { val rpcEnv = SparkEnv.get.rpcEnv val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) - localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint) + localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint) listenerBus.post(SparkListenerExecutorAdded( System.currentTimeMillis, executorEndpoint.localExecutorId, diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 1adc90ab1e..81b94b5721 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -242,8 +242,8 @@ class HeartbeatReceiverSuite } private def getTrackedExecutors: Map[String, Long] = { - // We may receive undesired SparkListenerExecutorAdded from LocalBackend, so exclude it from - // the map. See SPARK-10800. + // We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend, + // so exclude it from the map. See SPARK-10800. heartbeatReceiver.invokePrivate(_executorLastSeen()). filterKeys(_ != SparkContext.DRIVER_IDENTIFIER) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 213d70f4e5..11be40abca 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -21,10 +21,10 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import org.apache.spark.scheduler.local.LocalBackend -import org.apache.spark.util.Utils +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} +import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint + class SparkContextSchedulerCreationSuite extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging { @@ -58,7 +58,7 @@ class SparkContextSchedulerCreationSuite test("local") { val sched = createTaskScheduler("local") sched.backend match { - case s: LocalBackend => assert(s.totalCores === 1) + case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 1) case _ => fail() } } @@ -66,7 +66,8 @@ class SparkContextSchedulerCreationSuite test("local-*") { val sched = createTaskScheduler("local[*]") sched.backend match { - case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case s: LocalSchedulerBackendEndpoint => + assert(s.totalCores === Runtime.getRuntime.availableProcessors()) case _ => fail() } } @@ -75,7 +76,7 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) sched.backend match { - case s: LocalBackend => assert(s.totalCores === 5) + case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 5) case _ => fail() } } @@ -84,7 +85,8 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local[* ,2]") assert(sched.maxTaskFailures === 2) sched.backend match { - case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case s: LocalSchedulerBackendEndpoint => + assert(s.totalCores === Runtime.getRuntime.availableProcessors()) case _ => fail() } } @@ -93,7 +95,7 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local[4, 2]") assert(sched.maxTaskFailures === 2) sched.backend match { - case s: LocalBackend => assert(s.totalCores === 4) + case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 4) case _ => fail() } } @@ -117,14 +119,14 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local", "client", conf) sched.backend match { - case s: LocalBackend => assert(s.defaultParallelism() === 16) + case s: LocalSchedulerBackendEndpoint => assert(s.defaultParallelism() === 16) case _ => fail() } } test("local-cluster") { createTaskScheduler("local-cluster[3, 14, 1024]").backend match { - case s: SparkDeploySchedulerBackend => // OK + case s: StandaloneSchedulerBackend => // OK case _ => fail() } } @@ -143,19 +145,20 @@ class SparkContextSchedulerCreationSuite } test("mesos fine-grained") { - testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false) + testMesos("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false) } test("mesos coarse-grained") { - testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true) + testMesos("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true) } test("mesos with zookeeper") { testMesos("mesos://zk://localhost:1234,localhost:2345", - classOf[MesosSchedulerBackend], coarse = false) + classOf[MesosFineGrainedSchedulerBackend], coarse = false) } test("mesos with zookeeper and Master URL starting with zk://") { - testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false) + testMesos("zk://localhost:1234,localhost:2345", + classOf[MesosFineGrainedSchedulerBackend], coarse = false) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 7b46f9101d..f6ef9d15dd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -166,7 +166,7 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd } /** Application Listener to collect events */ - private class AppClientCollector extends AppClientListener with Logging { + private class AppClientCollector extends StandaloneAppClientListener with Logging { val connectedIdList = new ConcurrentLinkedQueue[String]() @volatile var disconnectedCount: Int = 0 val deadReasonList = new ConcurrentLinkedQueue[String]() @@ -208,7 +208,7 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd List(), Map(), Seq(), Seq(), Seq()) private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored") val listener = new AppClientCollector - val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf) + val client = new StandaloneAppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 15d59e7052..7f21d4c623 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.TaskSchedulerImpl -class CoarseMesosSchedulerBackendSuite extends SparkFunSuite +class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with BeforeAndAfter { @@ -44,7 +44,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite private var sparkConf: SparkConf = _ private var driver: SchedulerDriver = _ private var taskScheduler: TaskSchedulerImpl = _ - private var backend: CoarseMesosSchedulerBackend = _ + private var backend: MesosCoarseGrainedSchedulerBackend = _ private var externalShuffleClient: MesosExternalShuffleClient = _ private var driverEndpoint: RpcEndpointRef = _ @@ -230,7 +230,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val securityManager = mock[SecurityManager] - val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( masterUrl: String, scheduler: Scheduler, @@ -323,10 +324,11 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver, shuffleClient: MesosExternalShuffleClient, - endpoint: RpcEndpointRef): CoarseMesosSchedulerBackend = { + endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = { val securityManager = mock[SecurityManager] - val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( masterUrl: String, scheduler: Scheduler, diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 7d6b7bde68..41693b1191 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -40,7 +40,8 @@ import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.ExecutorInfo -class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { +class MesosFineGrainedSchedulerBackendSuite + extends SparkFunSuite with LocalSparkContext with MockitoSugar { test("weburi is set in created scheduler driver") { val conf = new SparkConf @@ -56,7 +57,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") { + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { override protected def createSchedulerDriver( masterUrl: String, scheduler: Scheduler, @@ -96,7 +97,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") val resources = Arrays.asList( mesosSchedulerBackend.createResource("cpus", 4), @@ -127,7 +128,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") val resources = Arrays.asList( mesosSchedulerBackend.createResource("cpus", 4), @@ -163,7 +164,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(sc.conf).thenReturn(conf) when(sc.listenerBus).thenReturn(listenerBus) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") val (execInfo, _) = backend.createExecutorInfo( Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") @@ -222,7 +223,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(sc.conf).thenReturn(new SparkConf) when(sc.listenerBus).thenReturn(listenerBus) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") val minMem = backend.executorMemory(sc) val minCpu = 4 @@ -333,7 +334,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val mesosOffers = new java.util.ArrayList[Offer] mesosOffers.add(offer) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) expectedWorkerOffers.append(new WorkerOffer( |