aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-10-29 14:01:00 -0700
committerAndrew Or <andrew@databricks.com>2014-10-29 14:01:00 -0700
commit1df05a40ebf3493b0aff46d18c0f30d2d5256c7b (patch)
tree319086e2ffcb624d779a37a7cfb8482a894853ac /yarn
parent353546766384b1e80fc8cc75c532d8d1821012b4 (diff)
downloadspark-1df05a40ebf3493b0aff46d18c0f30d2d5256c7b.tar.gz
spark-1df05a40ebf3493b0aff46d18c0f30d2d5256c7b.tar.bz2
spark-1df05a40ebf3493b0aff46d18c0f30d2d5256c7b.zip
[SPARK-3822] Executor scaling mechanism for Yarn
This is part of a broader effort to enable dynamic scaling of executors ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)). This is intended to work alongside SPARK-3795 (#2746), SPARK-3796 and SPARK-3797, but is functionally independently of these other issues. The logic is built on top of PraveenSeluka's changes at #2798. This is different from the changes there in a few major ways: (1) the mechanism is implemented within the existing scheduler backend framework rather than in new `Actor` classes. This also introduces a parent abstract class `YarnSchedulerBackend` to encapsulate common logic to communicate with the Yarn `ApplicationMaster`. (2) The interface of requesting executors exposed to the `SparkContext` is the same, but the communication between the scheduler backend and the AM uses total number executors desired instead of an incremental number. This is discussed in #2746 and explained in the comments in the code. I have tested this significantly on a stable Yarn cluster. ------------ A remaining task for this issue is to tone down the error messages emitted when an executor is removed. Currently, `SparkContext` and its components react as if the executor has failed, resulting in many scary error messages and eventual timeouts. While it's not strictly necessary to fix this as of the first-cut implementation of this mechanism, it would be good to add logic to distinguish this case. I prefer to address this in a separate PR. I have filed a separate JIRA for this task at SPARK-4134. Author: Andrew Or <andrew@databricks.com> Author: Andrew Or <andrewor14@gmail.com> Closes #2840 from andrewor14/yarn-scaling-mechanism and squashes the following commits: 485863e [Andrew Or] Minor log message changes 4920be8 [Andrew Or] Clarify that public API is only for Yarn mode for now 1c57804 [Andrew Or] Reword a few comments + other review comments 6321140 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism 02836c0 [Andrew Or] Limit scope of synchronization 4e2ed7f [Andrew Or] Fix bug: keep track of removed executors properly 73ade46 [Andrew Or] Wording changes (minor) 2a7a6da [Andrew Or] Add `sc.killExecutor` as a shorthand (minor) 665f229 [Andrew Or] Mima excludes 79aa2df [Andrew Or] Simplify the request interface by asking for a total 04f625b [Andrew Or] Fix race condition that causes over-allocation of executors f4783f8 [Andrew Or] Change the semantics of requesting executors 005a124 [Andrew Or] Fix tests 4628b16 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism db4a679 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism 572f5c5 [Andrew Or] Unused import (minor) f30261c [Andrew Or] Kill multiple executors rather than one at a time de260d9 [Andrew Or] Simplify by skipping useless null check 9c52542 [Andrew Or] Simplify by skipping the TaskSchedulerImpl 97dd1a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism d987b3e [Andrew Or] Move addWebUIFilters to Yarn scheduler backend 7b76d0a [Andrew Or] Expose mechanism in SparkContext as developer API 47466cd [Andrew Or] Refactor common Yarn scheduler backend logic c4dfaac [Andrew Or] Avoid thrashing when removing executors 53e8145 [Andrew Or] Start yarn actor early to listen for AM registration message bbee669 [Andrew Or] Add mechanism in yarn client mode
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala51
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala19
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala12
4 files changed, 81 insertions, 35 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e6fe0265d8..6807379888 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/**
@@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
- actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ YarnSchedulerBackend.ACTOR_NAME)
+ actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
userThread
}
- // Actor used to monitor the driver when running in client deploy mode.
- private class MonitorActor(driverUrl: String) extends Actor {
-
+ /**
+ * Actor that communicates with the driver in client deploy mode.
+ */
+ private class AMActor(driverUrl: String) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Send a hello message to establish the connection, after which
// we can monitor Lifecycle Events.
driver ! "Hello"
+ driver ! RegisterClusterManager
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
- }
+ case RequestExecutors(requestedTotal) =>
+ logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+ Option(allocator) match {
+ case Some(a) => a.requestTotalExecutors(requestedTotal)
+ case None => logWarning("Container allocator is not ready to request executors yet.")
+ }
+ sender ! true
+
+ case KillExecutors(executorIds) =>
+ logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+ Option(allocator) match {
+ case Some(a) => executorIds.foreach(a.killExecutor)
+ case None => logWarning("Container allocator is not ready to kill executors yet.")
+ }
+ sender ! true
+ }
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e1af8d5a74..7ae8ef237f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator(
private val executorIdCounter = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
- private val maxExecutors = args.numExecutors
+ private var maxExecutors = args.numExecutors
+
+ // Keep track of which container is running which executor to remove the executors later
+ private val executorIdToContainer = new HashMap[String, Container]
protected val executorMemory = args.executorMemory
protected val executorCores = args.executorCores
@@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
- def allocateResources() = {
+ /**
+ * Request as many executors from the ResourceManager as needed to reach the desired total.
+ * This takes into account executors already running or pending.
+ */
+ def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+ val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+ if (requestedTotal > currentTotal) {
+ maxExecutors += (requestedTotal - currentTotal)
+ // We need to call `allocateResources` here to avoid the following race condition:
+ // If we request executors twice before `allocateResources` is called, then we will end up
+ // double counting the number requested because `numPendingAllocate` is not updated yet.
+ allocateResources()
+ } else {
+ logInfo(s"Not allocating more executors because there are already $currentTotal " +
+ s"(application requested $requestedTotal total)")
+ }
+ }
+
+ /**
+ * Request that the ResourceManager release the container running the specified executor.
+ */
+ def killExecutor(executorId: String): Unit = synchronized {
+ if (executorIdToContainer.contains(executorId)) {
+ val container = executorIdToContainer.remove(executorId).get
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ maxExecutors -= 1
+ assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+ } else {
+ logWarning(s"Attempted to kill unknown executor $executorId!")
+ }
+ }
+
+ /**
+ * Allocate missing containers based on the number of executors currently pending and running.
+ *
+ * This method prioritizes the allocated container responses from the RM based on node and
+ * rack locality. Additionally, it releases any extra containers allocated for this application
+ * but are not needed. This must be synchronized because variables read in this block are
+ * mutated by other methods.
+ */
+ def allocateResources(): Unit = synchronized {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
// this is needed by alpha, do it here since we add numPending right after this
@@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator(
if (missing > 0) {
val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
@@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ executorIdToContainer(executorId) = container
// To be safe, remove the container from `releasedContainers`.
releasedContainers.remove(containerId)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 59b2b47aed..f6f6dc5243 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -17,27 +17,23 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl
-import scala.collection.mutable.ArrayBuffer
-
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ extends YarnSchedulerBackend(scheduler, sc)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
-
private var client: Client = null
private var appId: ApplicationId = null
private var stopping: Boolean = false
- private var totalExpectedExecutors = 0
/**
* Create a Yarn client to submit an application to the ResourceManager.
@@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
- override def applicationId(): String =
+ override def applicationId(): String = {
Option(appId).map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
}
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 3a186cfeb4..a96a54f668 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
-
- var totalExpectedExecutors = 0
-
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
+ extends YarnSchedulerBackend(scheduler, sc) {
override def start() {
super.start()
@@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend(
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
override def applicationId(): String =
// In YARN Cluster mode, spark.yarn.app.id is expect to be set
// before user application is launched.