aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-10-29 14:01:00 -0700
committerAndrew Or <andrew@databricks.com>2014-10-29 14:01:00 -0700
commit1df05a40ebf3493b0aff46d18c0f30d2d5256c7b (patch)
tree319086e2ffcb624d779a37a7cfb8482a894853ac
parent353546766384b1e80fc8cc75c532d8d1821012b4 (diff)
downloadspark-1df05a40ebf3493b0aff46d18c0f30d2d5256c7b.tar.gz
spark-1df05a40ebf3493b0aff46d18c0f30d2d5256c7b.tar.bz2
spark-1df05a40ebf3493b0aff46d18c0f30d2d5256c7b.zip
[SPARK-3822] Executor scaling mechanism for Yarn
This is part of a broader effort to enable dynamic scaling of executors ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)). This is intended to work alongside SPARK-3795 (#2746), SPARK-3796 and SPARK-3797, but is functionally independently of these other issues. The logic is built on top of PraveenSeluka's changes at #2798. This is different from the changes there in a few major ways: (1) the mechanism is implemented within the existing scheduler backend framework rather than in new `Actor` classes. This also introduces a parent abstract class `YarnSchedulerBackend` to encapsulate common logic to communicate with the Yarn `ApplicationMaster`. (2) The interface of requesting executors exposed to the `SparkContext` is the same, but the communication between the scheduler backend and the AM uses total number executors desired instead of an incremental number. This is discussed in #2746 and explained in the comments in the code. I have tested this significantly on a stable Yarn cluster. ------------ A remaining task for this issue is to tone down the error messages emitted when an executor is removed. Currently, `SparkContext` and its components react as if the executor has failed, resulting in many scary error messages and eventual timeouts. While it's not strictly necessary to fix this as of the first-cut implementation of this mechanism, it would be good to add logic to distinguish this case. I prefer to address this in a separate PR. I have filed a separate JIRA for this task at SPARK-4134. Author: Andrew Or <andrew@databricks.com> Author: Andrew Or <andrewor14@gmail.com> Closes #2840 from andrewor14/yarn-scaling-mechanism and squashes the following commits: 485863e [Andrew Or] Minor log message changes 4920be8 [Andrew Or] Clarify that public API is only for Yarn mode for now 1c57804 [Andrew Or] Reword a few comments + other review comments 6321140 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism 02836c0 [Andrew Or] Limit scope of synchronization 4e2ed7f [Andrew Or] Fix bug: keep track of removed executors properly 73ade46 [Andrew Or] Wording changes (minor) 2a7a6da [Andrew Or] Add `sc.killExecutor` as a shorthand (minor) 665f229 [Andrew Or] Mima excludes 79aa2df [Andrew Or] Simplify the request interface by asking for a total 04f625b [Andrew Or] Fix race condition that causes over-allocation of executors f4783f8 [Andrew Or] Change the semantics of requesting executors 005a124 [Andrew Or] Fix tests 4628b16 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism db4a679 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism 572f5c5 [Andrew Or] Unused import (minor) f30261c [Andrew Or] Kill multiple executors rather than one at a time de260d9 [Andrew Or] Simplify by skipping useless null check 9c52542 [Andrew Or] Simplify by skipping the TaskSchedulerImpl 97dd1a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism d987b3e [Andrew Or] Move addWebUIFilters to Yarn scheduler backend 7b76d0a [Andrew Or] Expose mechanism in SparkContext as developer API 47466cd [Andrew Or] Refactor common Yarn scheduler backend logic c4dfaac [Andrew Or] Avoid thrashing when removing executors 53e8145 [Andrew Or] Start yarn actor early to listen for AM registration message bbee669 [Andrew Or] Add mechanism in yarn client mode
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala142
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala7
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala51
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala19
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala12
12 files changed, 391 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e8fdfff043..40ea369f9e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -294,7 +294,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
executorEnvs("SPARK_USER") = sparkUser
// Create and start the scheduler
- private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
+ private[spark] var (schedulerBackend, taskScheduler) =
+ SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
@@ -856,6 +857,40 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
listenerBus.addListener(listener)
}
+ /**
+ * :: DeveloperApi ::
+ * Request an additional number of executors from the cluster manager.
+ * This is currently only supported in Yarn mode.
+ */
+ @DeveloperApi
+ def requestExecutors(numAdditionalExecutors: Int): Unit = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors)
+ case _ => logWarning("Requesting executors is only supported in coarse-grained mode")
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Request that the cluster manager kill the specified executors.
+ * This is currently only supported in Yarn mode.
+ */
+ @DeveloperApi
+ def killExecutors(executorIds: Seq[String]): Unit = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds)
+ case _ => logWarning("Killing executors is only supported in coarse-grained mode")
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Request that cluster manager the kill the specified executor.
+ * This is currently only supported in Yarn mode.
+ */
+ @DeveloperApi
+ def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId))
+
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION
@@ -1438,8 +1473,13 @@ object SparkContext extends Logging {
res
}
- /** Creates a task scheduler based on a given master URL. Extracted for testing. */
- private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
+ /**
+ * Create a task scheduler based on a given master URL.
+ * Return a 2-tuple of the scheduler backend and the task scheduler.
+ */
+ private def createTaskScheduler(
+ sc: SparkContext,
+ master: String): (SchedulerBackend, TaskScheduler) = {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1461,7 +1501,7 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1470,7 +1510,7 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1480,14 +1520,14 @@ object SparkContext extends Logging {
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
@@ -1507,7 +1547,7 @@ object SparkContext extends Logging {
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
}
- scheduler
+ (backend, scheduler)
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
@@ -1536,7 +1576,7 @@ object SparkContext extends Logging {
}
}
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case "yarn-client" =>
val scheduler = try {
@@ -1563,7 +1603,7 @@ object SparkContext extends Logging {
}
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
@@ -1576,13 +1616,13 @@ object SparkContext extends Logging {
new MesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case SIMR_REGEX(simrUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
scheduler.initialize(backend)
- scheduler
+ (backend, scheduler)
case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2b39c7fc87..cd3c015321 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -34,7 +34,6 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
-import akka.actor.Props
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index fb8160abc5..1da6fe976d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,19 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
- case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)
+ // Exchanged between the driver and the AM in Yarn client mode
+ case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
extends CoarseGrainedClusterMessage
+ // Messages exchanged between the driver and the cluster manager for executor allocation
+ // In Yarn mode, these are exchanged between the driver and the AM
+
+ case object RegisterClusterManager extends CoarseGrainedClusterMessage
+
+ // Request executors by specifying the new total number of executors desired
+ // This includes executors already pending or running
+ case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage
+
+ case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 59aed6b72f..7a6ee56f81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -31,7 +31,6 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
-import org.apache.spark.ui.JettyUtils
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -42,7 +41,7 @@ import org.apache.spark.ui.JettyUtils
* (spark.deploy.*).
*/
private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
extends SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@@ -61,10 +60,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()
+ private val executorDataMap = new HashMap[String, ExecutorData]
+
+ // Number of executors requested from the cluster manager that have not registered yet
+ private var numPendingExecutors = 0
+
+ // Executors we have requested the cluster manager to kill that have not died yet
+ private val executorsPendingToRemove = new HashSet[String]
+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val addressToExecutorId = new HashMap[Address, String]
- private val executorDataMap = new HashMap[String, ExecutorData]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -84,12 +90,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor
- executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
- Utils.parseHostPort(hostPort)._1, cores, cores))
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
+ val (host, _) = Utils.parseHostPort(hostPort)
+ val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+ // This must be synchronized because variables mutated
+ // in this block are read when requesting executors
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ executorDataMap.put(executorId, data)
+ if (numPendingExecutors > 0) {
+ numPendingExecutors -= 1
+ logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
+ }
+ }
makeOffers()
}
@@ -128,10 +143,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason)
sender ! true
- case AddWebUIFilter(filterName, filterParams, proxyBase) =>
- addWebUIFilter(filterName, filterParams, proxyBase)
- sender ! true
-
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
@@ -183,13 +194,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: String) {
+ def removeExecutor(executorId: String, reason: String): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
- executorDataMap -= executorId
+ // This must be synchronized because variables mutated
+ // in this block are read when requesting executors
+ CoarseGrainedSchedulerBackend.this.synchronized {
+ executorDataMap -= executorId
+ executorsPendingToRemove -= executorId
+ }
totalCoreCount.addAndGet(-executorInfo.totalCores)
scheduler.executorLost(executorId, SlaveLost(reason))
- case None => logError(s"Asked to remove non existant executor $executorId")
+ case None => logError(s"Asked to remove non-existent executor $executorId")
}
}
}
@@ -274,21 +290,62 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
false
}
- // Add filters to the SparkUI
- def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) {
- if (proxyBase != null && proxyBase.nonEmpty) {
- System.setProperty("spark.ui.proxyBase", proxyBase)
- }
+ /**
+ * Return the number of executors currently registered with this backend.
+ */
+ def numExistingExecutors: Int = executorDataMap.size
+
+ /**
+ * Request an additional number of executors from the cluster manager.
+ * Return whether the request is acknowledged.
+ */
+ final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+ logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
+ logDebug(s"Number of pending executors is now $numPendingExecutors")
+ numPendingExecutors += numAdditionalExecutors
+ // Account for executors pending to be added or removed
+ val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+ doRequestTotalExecutors(newTotal)
+ }
- val hasFilter = (filterName != null && filterName.nonEmpty &&
- filterParams != null && filterParams.nonEmpty)
- if (hasFilter) {
- logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
- conf.set("spark.ui.filters", filterName)
- filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
- scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+ /**
+ * Request executors from the cluster manager by specifying the total number desired,
+ * including existing pending and running executors.
+ *
+ * The semantics here guarantee that we do not over-allocate executors for this application,
+ * since a later request overrides the value of any prior request. The alternative interface
+ * of requesting a delta of executors risks double counting new executors when there are
+ * insufficient resources to satisfy the first request. We make the assumption here that the
+ * cluster manager will eventually fulfill all requests when resources free up.
+ *
+ * Return whether the request is acknowledged.
+ */
+ protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+
+ /**
+ * Request that the cluster manager kill the specified executors.
+ * Return whether the kill request is acknowledged.
+ */
+ final def killExecutors(executorIds: Seq[String]): Boolean = {
+ logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
+ val filteredExecutorIds = new ArrayBuffer[String]
+ executorIds.foreach { id =>
+ if (executorDataMap.contains(id)) {
+ filteredExecutorIds += id
+ } else {
+ logWarning(s"Executor to kill $id does not exist!")
+ }
}
+ executorsPendingToRemove ++= filteredExecutorIds
+ doKillExecutors(filteredExecutorIds)
}
+
+ /**
+ * Kill the given list of executors through the cluster manager.
+ * Return whether the kill request is acknowledged.
+ */
+ protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
+
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
new file mode 100644
index 0000000000..50721b9d6c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.util.AkkaUtils
+
+/**
+ * Abstract Yarn scheduler backend that contains common logic
+ * between the client and cluster Yarn scheduler backends.
+ */
+private[spark] abstract class YarnSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+ minRegisteredRatio = 0.8
+ }
+
+ protected var totalExpectedExecutors = 0
+
+ private val yarnSchedulerActor: ActorRef =
+ actorSystem.actorOf(
+ Props(new YarnSchedulerActor),
+ name = YarnSchedulerBackend.ACTOR_NAME)
+
+ private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
+
+ /**
+ * Request executors from the ApplicationMaster by specifying the total number desired.
+ * This includes executors already pending or running.
+ */
+ override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ AkkaUtils.askWithReply[Boolean](
+ RequestExecutors(requestedTotal), yarnSchedulerActor, askTimeout)
+ }
+
+ /**
+ * Request that the ApplicationMaster kill the specified executors.
+ */
+ override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ AkkaUtils.askWithReply[Boolean](
+ KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
+ }
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+ }
+
+ /**
+ * Add filters to the SparkUI.
+ */
+ private def addWebUIFilter(
+ filterName: String,
+ filterParams: Map[String, String],
+ proxyBase: String): Unit = {
+ if (proxyBase != null && proxyBase.nonEmpty) {
+ System.setProperty("spark.ui.proxyBase", proxyBase)
+ }
+
+ val hasFilter =
+ filterName != null && filterName.nonEmpty &&
+ filterParams != null && filterParams.nonEmpty
+ if (hasFilter) {
+ logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+ conf.set("spark.ui.filters", filterName)
+ filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
+ scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+ }
+ }
+
+ /**
+ * An actor that communicates with the ApplicationMaster.
+ */
+ private class YarnSchedulerActor extends Actor {
+ private var amActor: Option[ActorRef] = None
+
+ override def preStart(): Unit = {
+ // Listen for disassociation events
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ }
+
+ override def receive = {
+ case RegisterClusterManager =>
+ logInfo(s"ApplicationMaster registered as $sender")
+ amActor = Some(sender)
+
+ case r: RequestExecutors =>
+ amActor match {
+ case Some(actor) =>
+ sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+ case None =>
+ logWarning("Attempted to request executors before the AM has registered!")
+ sender ! false
+ }
+
+ case k: KillExecutors =>
+ amActor match {
+ case Some(actor) =>
+ sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+ case None =>
+ logWarning("Attempted to kill executors before the AM has registered!")
+ sender ! false
+ }
+
+ case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+ addWebUIFilter(filterName, filterParams, proxyBase)
+ sender ! true
+
+ case d: DisassociatedEvent =>
+ if (amActor.isDefined && sender == amActor.get) {
+ logWarning(s"ApplicationMaster has disassociated: $d")
+ }
+ }
+ }
+}
+
+private[spark] object YarnSchedulerBackend {
+ val ACTOR_NAME = "YarnScheduler"
+}
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f41c8d0315..79e398eb8c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -159,17 +159,28 @@ private[spark] object AkkaUtils extends Logging {
def askWithReply[T](
message: Any,
actor: ActorRef,
- retryAttempts: Int,
+ timeout: FiniteDuration): T = {
+ askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
+ }
+
+ /**
+ * Send a message to the given actor and get its result within a default timeout, or
+ * throw a SparkException if this fails even after the specified number of retries.
+ */
+ def askWithReply[T](
+ message: Any,
+ actor: ActorRef,
+ maxAttempts: Int,
retryInterval: Int,
timeout: FiniteDuration): T = {
// TODO: Consider removing multiple attempts
if (actor == null) {
- throw new SparkException("Error sending message as driverActor is null " +
+ throw new SparkException("Error sending message as actor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
- while (attempts < retryAttempts) {
+ while (attempts < maxAttempts) {
attempts += 1
try {
val future = actor.ask(message)(timeout)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 495a0d4863..df237ba796 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
-import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
@@ -31,8 +31,9 @@ class SparkContextSchedulerCreationSuite
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
val sc = new SparkContext("local", "test")
- val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
- val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
+ val createTaskSchedulerMethod =
+ PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
+ val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
sched.asInstanceOf[TaskSchedulerImpl]
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index adbdc5d1da..6a0495f8fd 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -73,6 +73,10 @@ object MimaExcludes {
"org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.collectAsync")
+ ) ++ Seq(
+ // SPARK-3822
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
)
case v if v.startsWith("1.1") =>
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e6fe0265d8..6807379888 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/**
@@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
- actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ YarnSchedulerBackend.ACTOR_NAME)
+ actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
userThread
}
- // Actor used to monitor the driver when running in client deploy mode.
- private class MonitorActor(driverUrl: String) extends Actor {
-
+ /**
+ * Actor that communicates with the driver in client deploy mode.
+ */
+ private class AMActor(driverUrl: String) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Send a hello message to establish the connection, after which
// we can monitor Lifecycle Events.
driver ! "Hello"
+ driver ! RegisterClusterManager
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
- }
+ case RequestExecutors(requestedTotal) =>
+ logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+ Option(allocator) match {
+ case Some(a) => a.requestTotalExecutors(requestedTotal)
+ case None => logWarning("Container allocator is not ready to request executors yet.")
+ }
+ sender ! true
+
+ case KillExecutors(executorIds) =>
+ logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+ Option(allocator) match {
+ case Some(a) => executorIds.foreach(a.killExecutor)
+ case None => logWarning("Container allocator is not ready to kill executors yet.")
+ }
+ sender ! true
+ }
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e1af8d5a74..7ae8ef237f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator(
private val executorIdCounter = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
- private val maxExecutors = args.numExecutors
+ private var maxExecutors = args.numExecutors
+
+ // Keep track of which container is running which executor to remove the executors later
+ private val executorIdToContainer = new HashMap[String, Container]
protected val executorMemory = args.executorMemory
protected val executorCores = args.executorCores
@@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
- def allocateResources() = {
+ /**
+ * Request as many executors from the ResourceManager as needed to reach the desired total.
+ * This takes into account executors already running or pending.
+ */
+ def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+ val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+ if (requestedTotal > currentTotal) {
+ maxExecutors += (requestedTotal - currentTotal)
+ // We need to call `allocateResources` here to avoid the following race condition:
+ // If we request executors twice before `allocateResources` is called, then we will end up
+ // double counting the number requested because `numPendingAllocate` is not updated yet.
+ allocateResources()
+ } else {
+ logInfo(s"Not allocating more executors because there are already $currentTotal " +
+ s"(application requested $requestedTotal total)")
+ }
+ }
+
+ /**
+ * Request that the ResourceManager release the container running the specified executor.
+ */
+ def killExecutor(executorId: String): Unit = synchronized {
+ if (executorIdToContainer.contains(executorId)) {
+ val container = executorIdToContainer.remove(executorId).get
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ maxExecutors -= 1
+ assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+ } else {
+ logWarning(s"Attempted to kill unknown executor $executorId!")
+ }
+ }
+
+ /**
+ * Allocate missing containers based on the number of executors currently pending and running.
+ *
+ * This method prioritizes the allocated container responses from the RM based on node and
+ * rack locality. Additionally, it releases any extra containers allocated for this application
+ * but are not needed. This must be synchronized because variables read in this block are
+ * mutated by other methods.
+ */
+ def allocateResources(): Unit = synchronized {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
// this is needed by alpha, do it here since we add numPending right after this
@@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator(
if (missing > 0) {
val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
@@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ executorIdToContainer(executorId) = container
// To be safe, remove the container from `releasedContainers`.
releasedContainers.remove(containerId)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 59b2b47aed..f6f6dc5243 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -17,27 +17,23 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl
-import scala.collection.mutable.ArrayBuffer
-
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ extends YarnSchedulerBackend(scheduler, sc)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
-
private var client: Client = null
private var appId: ApplicationId = null
private var stopping: Boolean = false
- private var totalExpectedExecutors = 0
/**
* Create a Yarn client to submit an application to the ResourceManager.
@@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
- override def applicationId(): String =
+ override def applicationId(): String = {
Option(appId).map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
}
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 3a186cfeb4..a96a54f668 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
-
- var totalExpectedExecutors = 0
-
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
+ extends YarnSchedulerBackend(scheduler, sc) {
override def start() {
super.start()
@@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend(
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
override def applicationId(): String =
// In YARN Cluster mode, spark.yarn.app.id is expect to be set
// before user application is launched.