aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-24 20:55:47 -0700
committerReynold Xin <rxin@databricks.com>2016-05-24 20:55:47 -0700
commit14494da87bdf057d2d2f796b962a4d8bc4747d31 (patch)
tree0dac9131cf4a7f32365cf3164e57dc9bd9c2a471 /core/src
parentf08bf587b1913c6cc8ecb34c45331cf4750961c9 (diff)
downloadspark-14494da87bdf057d2d2f796b962a4d8bc4747d31.tar.gz
spark-14494da87bdf057d2d2f796b962a4d8bc4747d31.tar.bz2
spark-14494da87bdf057d2d2f796b962a4d8bc4747d31.zip
[SPARK-15518] Rename various scheduler backend for consistency
## What changes were proposed in this pull request? This patch renames various scheduler backends to make them consistent: - LocalScheduler -> LocalSchedulerBackend - AppClient -> StandaloneAppClient - AppClientListener -> StandaloneAppClientListener - SparkDeploySchedulerBackend -> StandaloneSchedulerBackend - CoarseMesosSchedulerBackend -> MesosCoarseGrainedSchedulerBackend - MesosSchedulerBackend -> MesosFineGrainedSchedulerBackend ## How was this patch tested? Updated test cases to reflect the name change. Author: Reynold Xin <rxin@databricks.com> Closes #13288 from rxin/SPARK-15518.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala (renamed from core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala)11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala (renamed from core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala)15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala)46
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala)34
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala)18
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala)12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala (renamed from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala)15
14 files changed, 123 insertions, 107 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 351024bea4..36aa3becb4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -56,10 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
- SparkDeploySchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.scheduler.local.LocalBackend
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
+import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
@@ -2426,7 +2425,7 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
- val backend = new LocalBackend(sc.getConf, scheduler, 1)
+ val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
@@ -2438,7 +2437,7 @@ object SparkContext extends Logging {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
- val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
+ val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
@@ -2448,14 +2447,14 @@ object SparkContext extends Logging {
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
- val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
+ val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
- val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
+ val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
@@ -2472,9 +2471,9 @@ object SparkContext extends Logging {
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
- val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
+ val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
- backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
+ backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
@@ -2484,9 +2483,9 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc)
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
val backend = if (coarseGrained) {
- new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
+ new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
} else {
- new MesosSchedulerBackend(scheduler, sc, mesosUrl)
+ new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)
}
scheduler.initialize(backend)
(backend, scheduler)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 43b17e5d49..a9df732df9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -32,17 +32,18 @@ import org.apache.spark.rpc._
import org.apache.spark.util.{RpcUtils, ThreadUtils}
/**
- * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
- * an app description, and a listener for cluster events, and calls back the listener when various
- * events occur.
+ * Interface allowing applications to speak with a Spark standalone cluster manager.
+ *
+ * Takes a master URL, an app description, and a listener for cluster events, and calls
+ * back the listener when various events occur.
*
* @param masterUrls Each url should look like spark://host:port.
*/
-private[spark] class AppClient(
+private[spark] class StandaloneAppClient(
rpcEnv: RpcEnv,
masterUrls: Array[String],
appDescription: ApplicationDescription,
- listener: AppClientListener,
+ listener: StandaloneAppClientListener,
conf: SparkConf)
extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index 94506a0cbb..370b16ce42 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -24,7 +24,7 @@ package org.apache.spark.deploy.client
*
* Users of this API should *not* block inside the callback methods.
*/
-private[spark] trait AppClientListener {
+private[spark] trait StandaloneAppClientListener {
def connected(appId: String): Unit
/** Disconnection may be a temporary state, as we fail over to a new Master. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 371fb8602f..01e85ca405 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -33,21 +33,22 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
+import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
- * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
- * It handles common logic, like determining a scheduling order across jobs, waking up to launch
- * speculative tasks, etc.
+ * It can also work with a local setup by using a [[LocalSchedulerBackendEndpoint]] and setting
+ * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking
+ * up to launch speculative tasks, etc.
*
* Clients should first call initialize() and start(), then submit task sets through the
* runTasks method.
*
- * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple
* threads, so it needs locks in public API methods to maintain its state. In addition, some
- * SchedulerBackends synchronize on themselves when they want to send events here, and then
+ * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 85d002011d..8382fbe9dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -21,28 +21,31 @@ import java.util.concurrent.Semaphore
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
-import org.apache.spark.deploy.client.{AppClient, AppClientListener}
+import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
-private[spark] class SparkDeploySchedulerBackend(
+/**
+ * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager.
+ */
+private[spark] class StandaloneSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
- with AppClientListener
+ with StandaloneAppClientListener
with Logging {
- private var client: AppClient = null
+ private var client: StandaloneAppClient = null
private var stopping = false
private val launcherBackend = new LauncherBackend() {
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}
- @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
+ @volatile var shutdownCallback: StandaloneSchedulerBackend => Unit = _
@volatile private var appId: String = _
private val registrationBarrier = new Semaphore(0)
@@ -100,7 +103,7 @@ private[spark] class SparkDeploySchedulerBackend(
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
- client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
+ client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 2c5be1f528..e88e4ad475 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -25,13 +25,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{Buffer, HashMap, HashSet}
-import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.{RpcEndpointAddress}
+import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -43,16 +42,16 @@ import org.apache.spark.util.Utils
* CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
* latency.
*
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
+ * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]],
+ * but it seems hard to remove this.
*/
-private[spark] class CoarseMesosSchedulerBackend(
+private[spark] class MesosCoarseGrainedSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String,
securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
- with MScheduler
+ with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
@@ -149,7 +148,7 @@ private[spark] class CoarseMesosSchedulerBackend(
super.start()
val driver = createSchedulerDriver(
master,
- CoarseMesosSchedulerBackend.this,
+ MesosCoarseGrainedSchedulerBackend.this,
sc.sparkUser,
sc.appName,
sc.conf,
@@ -239,9 +238,10 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+ override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ override def registered(
+ d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
mesosExternalShuffleClient.foreach(_.init(appId))
logInfo("Registered as framework ID " + appId)
@@ -252,15 +252,15 @@ private[spark] class CoarseMesosSchedulerBackend(
totalCoresAcquired >= maxCores * minRegisteredRatio
}
- override def disconnected(d: SchedulerDriver) {}
+ override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+ override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
/**
* Method called by Mesos to offer resources on slaves. We respond by launching an executor,
* unless we've already launched more than we wanted to.
*/
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
stateLock.synchronized {
if (stopCalled) {
logDebug("Ignoring offers during shutdown")
@@ -282,7 +282,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
+ private def declineUnmatchedOffers(
+ d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
@@ -290,7 +291,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
private def declineOffer(
- d: SchedulerDriver,
+ d: org.apache.mesos.SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {
@@ -319,7 +320,8 @@ private[spark] class CoarseMesosSchedulerBackend(
* @param d SchedulerDriver
* @param offers Mesos offers that match attribute constraints
*/
- private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
+ private def handleMatchedOffers(
+ d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
@@ -440,7 +442,7 @@ private[spark] class CoarseMesosSchedulerBackend(
math.min(offerCPUs, maxCores - totalCoresAcquired))
}
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue
val slaveId = status.getSlaveId.getValue
val state = TaskState.fromMesos(status.getState)
@@ -498,7 +500,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- override def error(d: SchedulerDriver, message: String) {
+ override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
logError(s"Mesos error: $message")
scheduler.error(message)
}
@@ -538,14 +540,15 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+ override def frameworkMessage(
+ d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
/**
* Called when a slave is lost or a Mesos task finished. Updates local view on
* what tasks are running. It also notifies the driver that an executor was removed.
*/
private def executorTerminated(
- d: SchedulerDriver,
+ d: org.apache.mesos.SchedulerDriver,
slaveId: String,
taskId: String,
reason: String): Unit = {
@@ -555,11 +558,12 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
+ override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
}
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
+ override def executorLost(
+ d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
logInfo("Mesos executor lost: %s".format(e.getValue))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 1a94aee2ca..e08dc3b595 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -23,7 +23,6 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
-import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
@@ -38,12 +37,12 @@ import org.apache.spark.util.Utils
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
* from multiple apps can run on different cores) and in time (a core can switch ownership).
*/
-private[spark] class MesosSchedulerBackend(
+private[spark] class MesosFineGrainedSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
master: String)
extends SchedulerBackend
- with MScheduler
+ with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
// Stores the slave ids that has launched a Mesos executor.
@@ -74,7 +73,7 @@ private[spark] class MesosSchedulerBackend(
classLoader = Thread.currentThread.getContextClassLoader
val driver = createSchedulerDriver(
master,
- MesosSchedulerBackend.this,
+ MesosFineGrainedSchedulerBackend.this,
sc.sparkUser,
sc.appName,
sc.conf,
@@ -175,9 +174,10 @@ private[spark] class MesosSchedulerBackend(
execArgs
}
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+ override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ override def registered(
+ d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
@@ -195,9 +195,9 @@ private[spark] class MesosSchedulerBackend(
}
}
- override def disconnected(d: SchedulerDriver) {}
+ override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+ override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
val builder = new StringBuilder
@@ -216,7 +216,7 @@ private[spark] class MesosSchedulerBackend(
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
* tasks are balanced across the cluster.
*/
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
// Fail first on offers with unmet constraints
val (offersMatchingConstraints, offersNotMatchingConstraints) =
@@ -355,7 +355,7 @@ private[spark] class MesosSchedulerBackend(
(taskInfo, finalResources.asJava)
}
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) {
inClassLoader() {
val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState)
@@ -373,7 +373,7 @@ private[spark] class MesosSchedulerBackend(
}
}
- override def error(d: SchedulerDriver, message: String) {
+ override def error(d: org.apache.mesos.SchedulerDriver, message: String) {
inClassLoader() {
logError("Mesos error: " + message)
markErr()
@@ -391,7 +391,8 @@ private[spark] class MesosSchedulerBackend(
mesosDriver.reviveOffers()
}
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+ override def frameworkMessage(
+ d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
/**
* Remove executor associated with slaveId in a thread safe manner.
@@ -403,7 +404,8 @@ private[spark] class MesosSchedulerBackend(
}
}
- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+ private def recordSlaveLost(
+ d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
inClassLoader() {
logInfo("Mesos slave lost: " + slaveId.getValue)
removeExecutor(slaveId.getValue, reason.toString)
@@ -411,12 +413,12 @@ private[spark] class MesosSchedulerBackend(
}
}
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) {
recordSlaveLost(d, slaveId, SlaveLost())
}
- override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
- slaveId: SlaveID, status: Int) {
+ override def executorLost(
+ d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 1b7ac172de..05b2b08944 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging
/**
* A collection of utility functions which can be used by both the
- * MesosSchedulerBackend and the CoarseMesosSchedulerBackend.
+ * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]].
*/
private[mesos] object MesosSchedulerBackendUtil extends Logging {
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala
index 3473ef21b3..ee06588379 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala
@@ -39,15 +39,15 @@ private case class KillTask(taskId: Long, interruptThread: Boolean)
private case class StopExecutor()
/**
- * Calls to LocalBackend are all serialized through LocalEndpoint. Using an RpcEndpoint makes the
- * calls on LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
- * and the TaskSchedulerImpl.
+ * Calls to [[LocalSchedulerBackendEndpoint]] are all serialized through LocalEndpoint. Using an
+ * RpcEndpoint makes the calls on [[LocalSchedulerBackendEndpoint]] asynchronous, which is necessary
+ * to prevent deadlock between [[LocalSchedulerBackendEndpoint]] and the [[TaskSchedulerImpl]].
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
- executorBackend: LocalBackend,
+ executorBackend: LocalSchedulerBackendEndpoint,
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
@@ -91,11 +91,11 @@ private[spark] class LocalEndpoint(
}
/**
- * LocalBackend is used when running a local version of Spark where the executor, backend, and
- * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
- * on a single Executor (created by the LocalBackend) running locally.
+ * Used when running a local version of Spark where the executor, backend, and master all run in
+ * the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single
+ * Executor (created by the [[LocalSchedulerBackendEndpoint]]) running locally.
*/
-private[spark] class LocalBackend(
+private[spark] class LocalSchedulerBackendEndpoint(
conf: SparkConf,
scheduler: TaskSchedulerImpl,
val totalCores: Int)
@@ -124,7 +124,7 @@ private[spark] class LocalBackend(
override def start() {
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
- localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
+ localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 1adc90ab1e..81b94b5721 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -242,8 +242,8 @@ class HeartbeatReceiverSuite
}
private def getTrackedExecutors: Map[String, Long] = {
- // We may receive undesired SparkListenerExecutorAdded from LocalBackend, so exclude it from
- // the map. See SPARK-10800.
+ // We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend,
+ // so exclude it from the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
filterKeys(_ != SparkContext.DRIVER_IDENTIFIER)
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 213d70f4e5..11be40abca 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -21,10 +21,10 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.scheduler.local.LocalBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
+import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint
+
class SparkContextSchedulerCreationSuite
extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging {
@@ -58,7 +58,7 @@ class SparkContextSchedulerCreationSuite
test("local") {
val sched = createTaskScheduler("local")
sched.backend match {
- case s: LocalBackend => assert(s.totalCores === 1)
+ case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 1)
case _ => fail()
}
}
@@ -66,7 +66,8 @@ class SparkContextSchedulerCreationSuite
test("local-*") {
val sched = createTaskScheduler("local[*]")
sched.backend match {
- case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+ case s: LocalSchedulerBackendEndpoint =>
+ assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
}
@@ -75,7 +76,7 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
sched.backend match {
- case s: LocalBackend => assert(s.totalCores === 5)
+ case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 5)
case _ => fail()
}
}
@@ -84,7 +85,8 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local[* ,2]")
assert(sched.maxTaskFailures === 2)
sched.backend match {
- case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+ case s: LocalSchedulerBackendEndpoint =>
+ assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
}
@@ -93,7 +95,7 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local[4, 2]")
assert(sched.maxTaskFailures === 2)
sched.backend match {
- case s: LocalBackend => assert(s.totalCores === 4)
+ case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 4)
case _ => fail()
}
}
@@ -117,14 +119,14 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local", "client", conf)
sched.backend match {
- case s: LocalBackend => assert(s.defaultParallelism() === 16)
+ case s: LocalSchedulerBackendEndpoint => assert(s.defaultParallelism() === 16)
case _ => fail()
}
}
test("local-cluster") {
createTaskScheduler("local-cluster[3, 14, 1024]").backend match {
- case s: SparkDeploySchedulerBackend => // OK
+ case s: StandaloneSchedulerBackend => // OK
case _ => fail()
}
}
@@ -143,19 +145,20 @@ class SparkContextSchedulerCreationSuite
}
test("mesos fine-grained") {
- testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
+ testMesos("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
}
test("mesos coarse-grained") {
- testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
+ testMesos("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
}
test("mesos with zookeeper") {
testMesos("mesos://zk://localhost:1234,localhost:2345",
- classOf[MesosSchedulerBackend], coarse = false)
+ classOf[MesosFineGrainedSchedulerBackend], coarse = false)
}
test("mesos with zookeeper and Master URL starting with zk://") {
- testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
+ testMesos("zk://localhost:1234,localhost:2345",
+ classOf[MesosFineGrainedSchedulerBackend], coarse = false)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 7b46f9101d..f6ef9d15dd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -166,7 +166,7 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
}
/** Application Listener to collect events */
- private class AppClientCollector extends AppClientListener with Logging {
+ private class AppClientCollector extends StandaloneAppClientListener with Logging {
val connectedIdList = new ConcurrentLinkedQueue[String]()
@volatile var disconnectedCount: Int = 0
val deadReasonList = new ConcurrentLinkedQueue[String]()
@@ -208,7 +208,7 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
List(), Map(), Seq(), Seq(), Seq())
private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored")
val listener = new AppClientCollector
- val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf)
+ val client = new StandaloneAppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 15d59e7052..7f21d4c623 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.TaskSchedulerImpl
-class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
+class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
with MockitoSugar
with BeforeAndAfter {
@@ -44,7 +44,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
private var sparkConf: SparkConf = _
private var driver: SchedulerDriver = _
private var taskScheduler: TaskSchedulerImpl = _
- private var backend: CoarseMesosSchedulerBackend = _
+ private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _
@@ -230,7 +230,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]
- val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
+ val backend = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
@@ -323,10 +324,11 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
shuffleClient: MesosExternalShuffleClient,
- endpoint: RpcEndpointRef): CoarseMesosSchedulerBackend = {
+ endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
val securityManager = mock[SecurityManager]
- val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
+ val backend = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 7d6b7bde68..41693b1191 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -40,7 +40,8 @@ import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.ExecutorInfo
-class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+class MesosFineGrainedSchedulerBackendSuite
+ extends SparkFunSuite with LocalSparkContext with MockitoSugar {
test("weburi is set in created scheduler driver") {
val conf = new SparkConf
@@ -56,7 +57,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") {
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
@@ -96,7 +97,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
- val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
val resources = Arrays.asList(
mesosSchedulerBackend.createResource("cpus", 4),
@@ -127,7 +128,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
- val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
val resources = Arrays.asList(
mesosSchedulerBackend.createResource("cpus", 4),
@@ -163,7 +164,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
when(sc.conf).thenReturn(conf)
when(sc.listenerBus).thenReturn(listenerBus)
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
@@ -222,7 +223,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
val minMem = backend.executorMemory(sc)
val minCpu = 4
@@ -333,7 +334,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(offer)
- val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
expectedWorkerOffers.append(new WorkerOffer(