aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala142
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala7
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala51
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala19
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala12
12 files changed, 391 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e8fdfff043..40ea369f9e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -294,7 +294,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
executorEnvs("SPARK_USER") = sparkUser
// Create and start the scheduler
- private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
+ private[spark] var (schedulerBackend, taskScheduler) =
+ SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
@@ -856,6 +857,40 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
listenerBus.addListener(listener)
}
+ /**
+ * :: DeveloperApi ::
+ * Request an additional number of executors from the cluster manager.
+ * This is currently only supported in Yarn mode.
+ */
+ @DeveloperApi
+ def requestExecutors(numAdditionalExecutors: Int): Unit = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors)
+ case _ => logWarning("Requesting executors is only supported in coarse-grained mode")
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Request that the cluster manager kill the specified executors.
+ * This is currently only supported in Yarn mode.
+ */
+ @DeveloperApi
+ def killExecutors(executorIds: Seq[String]): Unit = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds)
+ case _ => logWarning("Killing executors is only supported in coarse-grained mode")
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Request that cluster manager the kill the specified executor.
+ * This is currently only supported in Yarn mode.
+ */
+ @DeveloperApi
+ def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId))
+
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION
@@ -1438,8 +1473,13 @@ object SparkContext extends Logging {
res
}
- /** Creates a task scheduler based on a given master URL. Extracted for testing. */
- private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
+ /**
+ * Create a task scheduler based on a given master URL.
+ * Return a 2-tuple of the scheduler backend and the task scheduler.
+ */
+ private def createTaskScheduler(
+ sc: SparkContext,
+ master: String): (SchedulerBackend, TaskScheduler) = {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1461,7 +1501,7 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1470,7 +1510,7 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1480,14 +1520,14 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
@@ -1507,7 +1547,7 @@ object SparkContext extends Logging {
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
}
- scheduler
+ (backend, scheduler)
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
@@ -1536,7 +1576,7 @@ object SparkContext extends Logging {
}
}
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case "yarn-client" =>
val scheduler = try {
@@ -1563,7 +1603,7 @@ object SparkContext extends Logging {
}
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
@@ -1576,13 +1616,13 @@ object SparkContext extends Logging {
new MesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case SIMR_REGEX(simrUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2b39c7fc87..cd3c015321 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -34,7 +34,6 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
-import akka.actor.Props
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index fb8160abc5..1da6fe976d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,19 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
- case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)
+ // Exchanged between the driver and the AM in Yarn client mode
+ case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
extends CoarseGrainedClusterMessage
+ // Messages exchanged between the driver and the cluster manager for executor allocation
+ // In Yarn mode, these are exchanged between the driver and the AM
+
+ case object RegisterClusterManager extends CoarseGrainedClusterMessage
+
+ // Request executors by specifying the new total number of executors desired
+ // This includes executors already pending or running
+ case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage
+
+ case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 59aed6b72f..7a6ee56f81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -31,7 +31,6 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
-import org.apache.spark.ui.JettyUtils
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -42,7 +41,7 @@ import org.apache.spark.ui.JettyUtils
* (spark.deploy.*).
*/
private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
extends SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@@ -61,10 +60,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
+ private val executorDataMap = new HashMap[String, ExecutorData]
+
+ // Number of executors requested from the cluster manager that have not registered yet
+ private var numPendingExecutors = 0
+
+ // Executors we have requested the cluster manager to kill that have not died yet
+ private val executorsPendingToRemove = new HashSet[String]
+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[Address, String]
- private val executorDataMap = new HashMap[String, ExecutorData]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -84,12 +90,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor
- executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
- Utils.parseHostPort(hostPort)._1, cores, cores))
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
+ val (host, _) = Utils.parseHostPort(hostPort)
+ val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+ // This must be synchronized because variables mutated
+ // in this block are read when requesting executors
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ executorDataMap.put(executorId, data)
+ if (numPendingExecutors > 0) {
+ numPendingExecutors -= 1
+ logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
+ }
+ }
makeOffers()
}
@@ -128,10 +143,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason)
sender ! true
- case AddWebUIFilter(filterName, filterParams, proxyBase) =>
- addWebUIFilter(filterName, filterParams, proxyBase)
- sender ! true
-
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
@@ -183,13 +194,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: String) {
+ def removeExecutor(executorId: String, reason: String): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
- executorDataMap -= executorId
+ // This must be synchronized because variables mutated
+ // in this block are read when requesting executors
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ executorDataMap -= executorId
+ executorsPendingToRemove -= executorId
+ }
totalCoreCount.addAndGet(-executorInfo.totalCores)
scheduler.executorLost(executorId, SlaveLost(reason))
- case None => logError(s"Asked to remove non existant executor $executorId")
+ case None => logError(s"Asked to remove non-existent executor $executorId")
}
}
}
@@ -274,21 +290,62 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
false
}
- // Add filters to the SparkUI
- def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) {
- if (proxyBase != null && proxyBase.nonEmpty) {
- System.setProperty("spark.ui.proxyBase", proxyBase)
- }
+ /**
+ * Return the number of executors currently registered with this backend.
+ */
+ def numExistingExecutors: Int = executorDataMap.size
+
+ /**
+ * Request an additional number of executors from the cluster manager.
+ * Return whether the request is acknowledged.
+ */
+ final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+ logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
+ logDebug(s"Number of pending executors is now $numPendingExecutors")
+ numPendingExecutors += numAdditionalExecutors
+ // Account for executors pending to be added or removed
+ val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+ doRequestTotalExecutors(newTotal)
+ }
- val hasFilter = (filterName != null && filterName.nonEmpty &&
- filterParams != null && filterParams.nonEmpty)
- if (hasFilter) {
- logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
- conf.set("spark.ui.filters", filterName)
- filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
- scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+ /**
+ * Request executors from the cluster manager by specifying the total number desired,
+ * including existing pending and running executors.
+ *
+ * The semantics here guarantee that we do not over-allocate executors for this application,
+ * since a later request overrides the value of any prior request. The alternative interface
+ * of requesting a delta of executors risks double counting new executors when there are
+ * insufficient resources to satisfy the first request. We make the assumption here that the
+ * cluster manager will eventually fulfill all requests when resources free up.
+ *
+ * Return whether the request is acknowledged.
+ */
+ protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+
+ /**
+ * Request that the cluster manager kill the specified executors.
+ * Return whether the kill request is acknowledged.
+ */
+ final def killExecutors(executorIds: Seq[String]): Boolean = {
+ logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
+ val filteredExecutorIds = new ArrayBuffer[String]
+ executorIds.foreach { id =>
+ if (executorDataMap.contains(id)) {
+ filteredExecutorIds += id
+ } else {
+ logWarning(s"Executor to kill $id does not exist!")
+ }
}
+ executorsPendingToRemove ++= filteredExecutorIds
+ doKillExecutors(filteredExecutorIds)
}
+
+ /**
+ * Kill the given list of executors through the cluster manager.
+ * Return whether the kill request is acknowledged.
+ */
+ protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
+
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
new file mode 100644
index 0000000000..50721b9d6c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.util.AkkaUtils
+
+/**
+ * Abstract Yarn scheduler backend that contains common logic
+ * between the client and cluster Yarn scheduler backends.
+ */
+private[spark] abstract class YarnSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+ minRegisteredRatio = 0.8
+ }
+
+ protected var totalExpectedExecutors = 0
+
+ private val yarnSchedulerActor: ActorRef =
+ actorSystem.actorOf(
+ Props(new YarnSchedulerActor),
+ name = YarnSchedulerBackend.ACTOR_NAME)
+
+ private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
+
+ /**
+ * Request executors from the ApplicationMaster by specifying the total number desired.
+ * This includes executors already pending or running.
+ */
+ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ AkkaUtils.askWithReply[Boolean](
+ RequestExecutors(requestedTotal), yarnSchedulerActor, askTimeout)
+ }
+
+ /**
+ * Request that the ApplicationMaster kill the specified executors.
+ */
+ override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ AkkaUtils.askWithReply[Boolean](
+ KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
+ }
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+ }
+
+ /**
+ * Add filters to the SparkUI.
+ */
+ private def addWebUIFilter(
+ filterName: String,
+ filterParams: Map[String, String],
+ proxyBase: String): Unit = {
+ if (proxyBase != null && proxyBase.nonEmpty) {
+ System.setProperty("spark.ui.proxyBase", proxyBase)
+ }
+
+ val hasFilter =
+ filterName != null && filterName.nonEmpty &&
+ filterParams != null && filterParams.nonEmpty
+ if (hasFilter) {
+ logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+ conf.set("spark.ui.filters", filterName)
+ filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
+ scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+ }
+ }
+
+ /**
+ * An actor that communicates with the ApplicationMaster.
+ */
+ private class YarnSchedulerActor extends Actor {
+ private var amActor: Option[ActorRef] = None
+
+ override def preStart(): Unit = {
+ // Listen for disassociation events
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ }
+
+ override def receive = {
+ case RegisterClusterManager =>
+ logInfo(s"ApplicationMaster registered as $sender")
+ amActor = Some(sender)
+
+ case r: RequestExecutors =>
+ amActor match {
+ case Some(actor) =>
+ sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+ case None =>
+ logWarning("Attempted to request executors before the AM has registered!")
+ sender ! false
+ }
+
+ case k: KillExecutors =>
+ amActor match {
+ case Some(actor) =>
+ sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+ case None =>
+ logWarning("Attempted to kill executors before the AM has registered!")
+ sender ! false
+ }
+
+ case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+ addWebUIFilter(filterName, filterParams, proxyBase)
+ sender ! true
+
+ case d: DisassociatedEvent =>
+ if (amActor.isDefined && sender == amActor.get) {
+ logWarning(s"ApplicationMaster has disassociated: $d")
+ }
+ }
+ }
+}
+
+private[spark] object YarnSchedulerBackend {
+ val ACTOR_NAME = "YarnScheduler"
+}
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f41c8d0315..79e398eb8c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -159,17 +159,28 @@ private[spark] object AkkaUtils extends Logging {
def askWithReply[T](
message: Any,
actor: ActorRef,
- retryAttempts: Int,
+ timeout: FiniteDuration): T = {
+ askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
+ }
+
+ /**
+ * Send a message to the given actor and get its result within a default timeout, or
+ * throw a SparkException if this fails even after the specified number of retries.
+ */
+ def askWithReply[T](
+ message: Any,
+ actor: ActorRef,
+ maxAttempts: Int,
retryInterval: Int,
timeout: FiniteDuration): T = {
// TODO: Consider removing multiple attempts
if (actor == null) {
- throw new SparkException("Error sending message as driverActor is null " +
+ throw new SparkException("Error sending message as actor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
- while (attempts < retryAttempts) {
+ while (attempts < maxAttempts) {
attempts += 1
try {
val future = actor.ask(message)(timeout)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 495a0d4863..df237ba796 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
-import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
@@ -31,8 +31,9 @@ class SparkContextSchedulerCreationSuite
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
val sc = new SparkContext("local", "test")
- val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
- val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
+ val createTaskSchedulerMethod =
+ PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
+ val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
sched.asInstanceOf[TaskSchedulerImpl]
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index adbdc5d1da..6a0495f8fd 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -73,6 +73,10 @@ object MimaExcludes {
"org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.collectAsync")
+ ) ++ Seq(
+ // SPARK-3822
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
)
case v if v.startsWith("1.1") =>
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e6fe0265d8..6807379888 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/**
@@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
- actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ YarnSchedulerBackend.ACTOR_NAME)
+ actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
userThread
}
- // Actor used to monitor the driver when running in client deploy mode.
- private class MonitorActor(driverUrl: String) extends Actor {
-
+ /**
+ * Actor that communicates with the driver in client deploy mode.
+ */
+ private class AMActor(driverUrl: String) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Send a hello message to establish the connection, after which
// we can monitor Lifecycle Events.
driver ! "Hello"
+ driver ! RegisterClusterManager
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
- }
+ case RequestExecutors(requestedTotal) =>
+ logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+ Option(allocator) match {
+ case Some(a) => a.requestTotalExecutors(requestedTotal)
+ case None => logWarning("Container allocator is not ready to request executors yet.")
+ }
+ sender ! true
+
+ case KillExecutors(executorIds) =>
+ logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+ Option(allocator) match {
+ case Some(a) => executorIds.foreach(a.killExecutor)
+ case None => logWarning("Container allocator is not ready to kill executors yet.")
+ }
+ sender ! true
+ }
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e1af8d5a74..7ae8ef237f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator(
private val executorIdCounter = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
- private val maxExecutors = args.numExecutors
+ private var maxExecutors = args.numExecutors
+
+ // Keep track of which container is running which executor to remove the executors later
+ private val executorIdToContainer = new HashMap[String, Container]
protected val executorMemory = args.executorMemory
protected val executorCores = args.executorCores
@@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
- def allocateResources() = {
+ /**
+ * Request as many executors from the ResourceManager as needed to reach the desired total.
+ * This takes into account executors already running or pending.
+ */
+ def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+ val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+ if (requestedTotal > currentTotal) {
+ maxExecutors += (requestedTotal - currentTotal)
+ // We need to call `allocateResources` here to avoid the following race condition:
+ // If we request executors twice before `allocateResources` is called, then we will end up
+ // double counting the number requested because `numPendingAllocate` is not updated yet.
+ allocateResources()
+ } else {
+ logInfo(s"Not allocating more executors because there are already $currentTotal " +
+ s"(application requested $requestedTotal total)")
+ }
+ }
+
+ /**
+ * Request that the ResourceManager release the container running the specified executor.
+ */
+ def killExecutor(executorId: String): Unit = synchronized {
+ if (executorIdToContainer.contains(executorId)) {
+ val container = executorIdToContainer.remove(executorId).get
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ maxExecutors -= 1
+ assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+ } else {
+ logWarning(s"Attempted to kill unknown executor $executorId!")
+ }
+ }
+
+ /**
+ * Allocate missing containers based on the number of executors currently pending and running.
+ *
+ * This method prioritizes the allocated container responses from the RM based on node and
+ * rack locality. Additionally, it releases any extra containers allocated for this application
+ * but are not needed. This must be synchronized because variables read in this block are
+ * mutated by other methods.
+ */
+ def allocateResources(): Unit = synchronized {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
// this is needed by alpha, do it here since we add numPending right after this
@@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator(
if (missing > 0) {
val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
@@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ executorIdToContainer(executorId) = container
// To be safe, remove the container from `releasedContainers`.
releasedContainers.remove(containerId)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 59b2b47aed..f6f6dc5243 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -17,27 +17,23 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl
-import scala.collection.mutable.ArrayBuffer
-
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ extends YarnSchedulerBackend(scheduler, sc)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
-
private var client: Client = null
private var appId: ApplicationId = null
private var stopping: Boolean = false
- private var totalExpectedExecutors = 0
/**
* Create a Yarn client to submit an application to the ResourceManager.
@@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
- override def applicationId(): String =
+ override def applicationId(): String = {
Option(appId).map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
}
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 3a186cfeb4..a96a54f668 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
-
- var totalExpectedExecutors = 0
-
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
+ extends YarnSchedulerBackend(scheduler, sc) {
override def start() {
super.start()
@@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend(
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
override def applicationId(): String =
// In YARN Cluster mode, spark.yarn.app.id is expect to be set
// before user application is launched.