aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala34
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala51
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala19
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala12
4 files changed, 81 insertions, 35 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e6fe0265d8..6807379888 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/**
@@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
- actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ YarnSchedulerBackend.ACTOR_NAME)
+ actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
}
/** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
userThread
}
- // Actor used to monitor the driver when running in client deploy mode.
- private class MonitorActor(driverUrl: String) extends Actor {
-
+ /**
+ * Actor that communicates with the driver in client deploy mode.
+ */
+ private class AMActor(driverUrl: String) extends Actor {
var driver: ActorSelection = _
override def preStart() = {
@@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Send a hello message to establish the connection, after which
// we can monitor Lifecycle Events.
driver ! "Hello"
+ driver ! RegisterClusterManager
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
- }
+ case RequestExecutors(requestedTotal) =>
+ logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+ Option(allocator) match {
+ case Some(a) => a.requestTotalExecutors(requestedTotal)
+ case None => logWarning("Container allocator is not ready to request executors yet.")
+ }
+ sender ! true
+
+ case KillExecutors(executorIds) =>
+ logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+ Option(allocator) match {
+ case Some(a) => executorIds.foreach(a.killExecutor)
+ case None => logWarning("Container allocator is not ready to kill executors yet.")
+ }
+ sender ! true
+ }
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e1af8d5a74..7ae8ef237f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator(
private val executorIdCounter = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
- private val maxExecutors = args.numExecutors
+ private var maxExecutors = args.numExecutors
+
+ // Keep track of which container is running which executor to remove the executors later
+ private val executorIdToContainer = new HashMap[String, Container]
protected val executorMemory = args.executorMemory
protected val executorCores = args.executorCores
@@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
- def allocateResources() = {
+ /**
+ * Request as many executors from the ResourceManager as needed to reach the desired total.
+ * This takes into account executors already running or pending.
+ */
+ def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+ val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+ if (requestedTotal > currentTotal) {
+ maxExecutors += (requestedTotal - currentTotal)
+ // We need to call `allocateResources` here to avoid the following race condition:
+ // If we request executors twice before `allocateResources` is called, then we will end up
+ // double counting the number requested because `numPendingAllocate` is not updated yet.
+ allocateResources()
+ } else {
+ logInfo(s"Not allocating more executors because there are already $currentTotal " +
+ s"(application requested $requestedTotal total)")
+ }
+ }
+
+ /**
+ * Request that the ResourceManager release the container running the specified executor.
+ */
+ def killExecutor(executorId: String): Unit = synchronized {
+ if (executorIdToContainer.contains(executorId)) {
+ val container = executorIdToContainer.remove(executorId).get
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ maxExecutors -= 1
+ assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+ } else {
+ logWarning(s"Attempted to kill unknown executor $executorId!")
+ }
+ }
+
+ /**
+ * Allocate missing containers based on the number of executors currently pending and running.
+ *
+ * This method prioritizes the allocated container responses from the RM based on node and
+ * rack locality. Additionally, it releases any extra containers allocated for this application
+ * but are not needed. This must be synchronized because variables read in this block are
+ * mutated by other methods.
+ */
+ def allocateResources(): Unit = synchronized {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
// this is needed by alpha, do it here since we add numPending right after this
@@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator(
if (missing > 0) {
val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
@@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ executorIdToContainer(executorId) = container
// To be safe, remove the container from `releasedContainers`.
releasedContainers.remove(containerId)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 59b2b47aed..f6f6dc5243 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -17,27 +17,23 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl
-import scala.collection.mutable.ArrayBuffer
-
private[spark] class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ extends YarnSchedulerBackend(scheduler, sc)
with Logging {
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
-
private var client: Client = null
private var appId: ApplicationId = null
private var stopping: Boolean = false
- private var totalExpectedExecutors = 0
/**
* Create a Yarn client to submit an application to the ResourceManager.
@@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
- override def applicationId(): String =
+ override def applicationId(): String = {
Option(appId).map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
}
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 3a186cfeb4..a96a54f668 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
-
- var totalExpectedExecutors = 0
-
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
+ extends YarnSchedulerBackend(scheduler, sc) {
override def start() {
super.start()
@@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend(
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
}
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
override def applicationId(): String =
// In YARN Cluster mode, spark.yarn.app.id is expect to be set
// before user application is launched.