diff options
Diffstat (limited to 'yarn/common')
4 files changed, 81 insertions, 35 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e6fe0265d8..6807379888 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter +import org.apache.spark.scheduler.cluster.YarnSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} /** @@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, SparkEnv.driverActorSystemName, driverHost, driverPort.toString, - CoarseGrainedSchedulerBackend.ACTOR_NAME) - actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") + YarnSchedulerBackend.ACTOR_NAME) + actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM") } /** Add the Yarn IP filter that is required for properly securing the UI. */ @@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, userThread } - // Actor used to monitor the driver when running in client deploy mode. - private class MonitorActor(driverUrl: String) extends Actor { - + /** + * Actor that communicates with the driver in client deploy mode. + */ + private class AMActor(driverUrl: String) extends Actor { var driver: ActorSelection = _ override def preStart() = { @@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Send a hello message to establish the connection, after which // we can monitor Lifecycle Events. driver ! "Hello" + driver ! RegisterClusterManager context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } @@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, case x: DisassociatedEvent => logInfo(s"Driver terminated or disconnected! Shutting down. $x") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + case x: AddWebUIFilter => logInfo(s"Add WebUI Filter. $x") driver ! x - } + case RequestExecutors(requestedTotal) => + logInfo(s"Driver requested a total number of executors of $requestedTotal.") + Option(allocator) match { + case Some(a) => a.requestTotalExecutors(requestedTotal) + case None => logWarning("Container allocator is not ready to request executors yet.") + } + sender ! true + + case KillExecutors(executorIds) => + logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") + Option(allocator) match { + case Some(a) => executorIds.foreach(a.killExecutor) + case None => logWarning("Container allocator is not ready to kill executors yet.") + } + sender ! true + } } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e1af8d5a74..7ae8ef237f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator( private val executorIdCounter = new AtomicInteger() private val numExecutorsFailed = new AtomicInteger() - private val maxExecutors = args.numExecutors + private var maxExecutors = args.numExecutors + + // Keep track of which container is running which executor to remove the executors later + private val executorIdToContainer = new HashMap[String, Container] protected val executorMemory = args.executorMemory protected val executorCores = args.executorCores @@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue - def allocateResources() = { + /** + * Request as many executors from the ResourceManager as needed to reach the desired total. + * This takes into account executors already running or pending. + */ + def requestTotalExecutors(requestedTotal: Int): Unit = synchronized { + val currentTotal = numPendingAllocate.get + numExecutorsRunning.get + if (requestedTotal > currentTotal) { + maxExecutors += (requestedTotal - currentTotal) + // We need to call `allocateResources` here to avoid the following race condition: + // If we request executors twice before `allocateResources` is called, then we will end up + // double counting the number requested because `numPendingAllocate` is not updated yet. + allocateResources() + } else { + logInfo(s"Not allocating more executors because there are already $currentTotal " + + s"(application requested $requestedTotal total)") + } + } + + /** + * Request that the ResourceManager release the container running the specified executor. + */ + def killExecutor(executorId: String): Unit = synchronized { + if (executorIdToContainer.contains(executorId)) { + val container = executorIdToContainer.remove(executorId).get + internalReleaseContainer(container) + numExecutorsRunning.decrementAndGet() + maxExecutors -= 1 + assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!") + } else { + logWarning(s"Attempted to kill unknown executor $executorId!") + } + } + + /** + * Allocate missing containers based on the number of executors currently pending and running. + * + * This method prioritizes the allocated container responses from the RM based on node and + * rack locality. Additionally, it releases any extra containers allocated for this application + * but are not needed. This must be synchronized because variables read in this block are + * mutated by other methods. + */ + def allocateResources(): Unit = synchronized { val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() // this is needed by alpha, do it here since we add numPending right after this @@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator( if (missing > 0) { val totalExecutorMemory = executorMemory + memoryOverhead numPendingAllocate.addAndGet(missing) - logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + + logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + s"memory including $memoryOverhead MB overhead") } else { logDebug("Empty allocation request ...") @@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator( CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) + executorIdToContainer(executorId) = container // To be safe, remove the container from `releasedContainers`. releasedContainers.remove(containerId) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 59b2b47aed..f6f6dc5243 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -17,27 +17,23 @@ package org.apache.spark.scheduler.cluster +import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} + import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} import org.apache.spark.scheduler.TaskSchedulerImpl -import scala.collection.mutable.ArrayBuffer - private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + extends YarnSchedulerBackend(scheduler, sc) with Logging { - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { - minRegisteredRatio = 0.8 - } - private var client: Client = null private var appId: ApplicationId = null private var stopping: Boolean = false - private var totalExpectedExecutors = 0 /** * Create a Yarn client to submit an application to the ResourceManager. @@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend( logInfo("Stopped") } - override def sufficientResourcesRegistered(): Boolean = { - totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio - } - - override def applicationId(): String = + override def applicationId(): String = { Option(appId).map(_.toString).getOrElse { logWarning("Application ID is not initialized yet.") super.applicationId } + } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 3a186cfeb4..a96a54f668 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { - - var totalExpectedExecutors = 0 - - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { - minRegisteredRatio = 0.8 - } + extends YarnSchedulerBackend(scheduler, sc) { override def start() { super.start() @@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend( totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) } - override def sufficientResourcesRegistered(): Boolean = { - totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio - } - override def applicationId(): String = // In YARN Cluster mode, spark.yarn.app.id is expect to be set // before user application is launched. |