aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala453
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala315
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala192
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala103
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala430
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala26
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala9
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala54
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala34
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala67
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala51
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala11
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala7
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala17
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala413
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala276
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala196
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala76
18 files changed, 892 insertions, 1838 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 4d4848b1bd..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.net.Socket
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.{SignalLogger, Utils}
-
-/**
- * An application master that runs the users driver program and allocates executors.
- */
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
- sparkConf: SparkConf) extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private val rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = _
- private var appAttemptId: ApplicationAttemptId = _
- private var userThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- private val fs = FileSystem.get(yarnConf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var isFinished: Boolean = false
- private var uiAddress: String = _
- private var uiHistoryAddress: String = _
- private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
- YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
- private var isLastAMRetry: Boolean = true
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- private var registered = false
-
- def run() {
- // set the web ui port to be ephemeral for yarn so we don't conflict with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
-
- // when running the AM, the Spark master is always "yarn-cluster"
- System.setProperty("spark.master", "yarn-cluster")
-
- // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
- ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
- appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
- resourceManager = registerWithResourceManager()
-
- // setup AmIpFilter for the SparkUI - do this before we start the UI
- addAmIpFilter()
-
- ApplicationMaster.register(this)
-
- // Call this to force generation of secret so it gets populated into the
- // hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the executor containers.
- val securityMgr = new SecurityManager(sparkConf)
-
- // Start the user's JAR
- userThread = startUserClass()
-
- // This a bit hacky, but we need to wait until the spark.driver.port property has
- // been set by the Thread executing the user class.
- waitForSparkContextInitialized()
-
- // Do this after spark master is up and SparkContext is created so that we can register UI Url
- synchronized {
- if (!isFinished) {
- registerApplicationMaster()
- registered = true
- }
- }
-
- // Allocate all containers
- allocateExecutors()
-
- // Wait for the user class to Finish
- userThread.join()
-
- System.exit(0)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- System.setProperty("spark.ui.filters", amFilter)
- val proxy = YarnConfiguration.getProxyHostAndPort(conf)
- val parts : Array[String] = proxy.split(":")
- val uriBase = "http://" + proxy +
- System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
- params)
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some
- // sensible info.
- // Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- appMasterRequest.setTrackingUrl(uiAddress)
- resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
- System.setProperty("spark.executor.instances", args.numExecutors.toString)
- val mainMethod = Class.forName(
- args.userClass,
- false /* initialize */ ,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
- override def run() {
-
- var successed = false
- try {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
- // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
- // userThread will stop here unless it has uncaught exception thrown out
- // It need shutdown hook to set SUCCEEDED
- successed = true
- } finally {
- logDebug("finishing main")
- isLastAMRetry = true
- if (successed) {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } else {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- }
- }
- }
- }
- t.start()
- t
- }
-
- // this need to happen before allocateExecutors
- private def waitForSparkContextInitialized() {
- logInfo("Waiting for spark context initialization")
- try {
- var sparkContext: SparkContext = null
- ApplicationMaster.sparkContextRef.synchronized {
- var count = 0
- val waitTime = 10000L
- val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
- && !isFinished) {
- logInfo("Waiting for spark context initialization ... " + count)
- count = count + 1
- ApplicationMaster.sparkContextRef.wait(waitTime)
- }
- sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null || count >= numTries)
-
- if (null != sparkContext) {
- uiAddress = sparkContext.ui.appUIHostPort
- uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- resourceManager,
- appAttemptId,
- args,
- sparkContext.preferredNodeLocationData,
- sparkContext.getConf)
- } else {
- logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
- format(count * waitTime, numTries))
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- resourceManager,
- appAttemptId,
- args,
- sparkContext.getConf)
- }
- }
- }
- }
-
- private def allocateExecutors() {
- try {
- logInfo("Allocating " + args.numExecutors + " executors.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
-
- // Exits the loop if the user thread exits.
- while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
- && !isFinished) {
- checkNumExecutorsFailed()
- yarnAllocator.allocateContainers(
- math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
- Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
- }
- }
- logInfo("All executors have launched.")
-
- // Launch a progress reporter thread, else the app will get killed after expiration
- // (def: 10mins) timeout.
- // TODO(harvey): Verify the timeout
- if (userThread.isAlive) {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- launchReporterThread(interval)
- }
- }
-
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (userThread.isAlive && !isFinished) {
- checkNumExecutorsFailed()
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.allocateContainers(missingExecutorCount)
- } else {
- sendProgress()
- }
- Thread.sleep(sleepTime)
- }
- }
- }
- // Setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- logInfo("max number of executor failures reached")
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- // make sure to stop the user thread
- val sparkContext = ApplicationMaster.sparkContextRef.get()
- if (sparkContext != null) {
- logInfo("Invoking sc stop from checkNumExecutorsFailed")
- sparkContext.stop()
- } else {
- logError("sparkContext is null when should shutdown")
- }
- }
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // Simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- /*
- def printContainers(containers: List[Container]) = {
- for (container <- containers) {
- logInfo("Launching shell command on a new container."
- + ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
- + ":" + container.getNodeId().getPort()
- + ", containerNodeURI=" + container.getNodeHttpAddress()
- + ", containerState" + container.getState()
- + ", containerResourceMemory"
- + container.getResource().getMemory())
- }
- }
- */
-
- def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- isFinished = true
-
- logInfo("finishApplicationMaster with " + status)
- if (registered) {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- finishReq.setTrackingUrl(uiHistoryAddress)
- resourceManager.finishApplicationMaster(finishReq)
- }
- }
- }
-
- /**
- * Clean up the staging directory.
- */
- private def cleanupStagingDir() {
- var stagingDirPath: Path = null
- try {
- val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
- if (!preserveFiles) {
- stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
- if (stagingDirPath == null) {
- logError("Staging directory is null")
- return
- }
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
- }
- } catch {
- case ioe: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
- }
- }
-
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
- class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
- def run() {
- logInfo("AppMaster received a signal.")
- // we need to clean up staging dir before HDFS is shut down
- // make sure we don't delete it until this is the last AM
- if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
- }
- }
-
-}
-
-object ApplicationMaster extends Logging {
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
- // optimal as more containers are available. Might need to handle this better.
- private val ALLOCATE_HEARTBEAT_INTERVAL = 100
-
- private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
- def register(master: ApplicationMaster) {
- applicationMasters.add(master)
- }
-
- val sparkContextRef: AtomicReference[SparkContext] =
- new AtomicReference[SparkContext](null /* initialValue */)
-
- def sparkContextInitialized(sc: SparkContext): Boolean = {
- var modified = false
- sparkContextRef.synchronized {
- modified = sparkContextRef.compareAndSet(null, sc)
- sparkContextRef.notifyAll()
- }
-
- // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
- // System.exit.
- // Should not really have to do this, but it helps YARN to evict resources earlier.
- // Not to mention, prevent the Client from declaring failure even though we exited properly.
- // Note that this will unfortunately not properly clean up the staging files because it gets
- // called too late, after the filesystem is already shutdown.
- if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not only logs, but also ensures that log system is initialized for this instance
- // when we are actually 'run'-ing.
- logInfo("Adding shutdown hook for context " + sc)
-
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- // Best case ...
- for (master <- applicationMasters) {
- master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- }
- }
- })
- }
-
- modified
- }
-
- def main(argStrings: Array[String]) {
- SignalLogger.register(log)
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ApplicationMaster(args).run()
- }
- }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
deleted file mode 100644
index 155dd88aa2..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * An application master that allocates executors on behalf of a driver that is running outside
- * the cluster.
- *
- * This is used only in yarn-client mode.
- */
-class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
- extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private val rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = _
- private var appAttemptId: ApplicationAttemptId = _
- private var reporterThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- private var yarnAllocator: YarnAllocationHandler = _
-
- private var driverClosed: Boolean = false
- private var isFinished: Boolean = false
- private var registered: Boolean = false
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- val securityManager = new SecurityManager(sparkConf)
- val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf, securityManager = securityManager)._1
- var actor: ActorRef = _
-
- // This actor just working as a monitor to watch on Driver Actor.
- class MonitorActor(driverUrl: String) extends Actor {
-
- var driver: ActorSelection = _
-
- override def preStart() {
- logInfo("Listen to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- // Send a hello message thus the connection is actually established, thus we can
- // monitor Lifecycle Events.
- driver ! "Hello"
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- }
-
- override def receive = {
- case x: DisassociatedEvent =>
- logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- driverClosed = true
- case x: AddWebUIFilter =>
- logInfo(s"Add WebUI Filter. $x")
- driver ! x
- }
- }
-
- def run() {
- appAttemptId = getApplicationAttemptId()
- resourceManager = registerWithResourceManager()
-
- synchronized {
- if (!isFinished) {
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
- if (minimumMemory > 0) {
- val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- }
- }
- registered = true
- }
- }
- waitForSparkMaster()
- addAmIpFilter()
- // Allocate all containers
- allocateExecutors()
-
- // Launch a progress reporter thread, else app will get killed after expiration
- // (def: 10mins) timeout ensure that progress is sent before
- // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
-
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- reporterThread = launchReporterThread(interval)
-
- // Wait for the reporter thread to Finish.
- reporterThread.join()
-
- finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- actorSystem.shutdown()
-
- logInfo("Exited")
- System.exit(0)
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
- logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has
- // some sensible info. Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl(appUIAddress)
- resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val proxy = YarnConfiguration.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
- }
-
- private def waitForSparkMaster() {
- logInfo("Waiting for spark driver to be reachable.")
- var driverUp = false
- val hostport = args.userArgs(0)
- val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- while(!driverUp) {
- try {
- val socket = new Socket(driverHost, driverPort)
- socket.close()
- logInfo("Master now available: " + driverHost + ":" + driverPort)
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
- Thread.sleep(100)
- }
- }
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
-
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- driverHost,
- driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
- }
-
-
- private def allocateExecutors() {
-
- // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map()
-
- yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
- args, preferredNodeLocationData, sparkConf)
-
- logInfo("Allocating " + args.numExecutors + " executors.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
- while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
- !isFinished) {
- yarnAllocator.allocateContainers(
- math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
- checkNumExecutorsFailed()
- Thread.sleep(100)
- }
-
- logInfo("All executors have launched.")
- }
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
- }
-
- // TODO: We might want to extend this to allocate more containers in case they die !
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (!driverClosed && !isFinished) {
- checkNumExecutorsFailed()
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
- if (missingExecutorCount > 0) {
- logInfo("Allocating " + missingExecutorCount +
- " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingExecutorCount)
- } else {
- sendProgress()
- }
- Thread.sleep(sleepTime)
- }
- }
- }
- // setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- logInfo("Unregistering ApplicationMaster with " + status)
- if (registered) {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
- finishReq.setDiagnostics(appMessage)
- resourceManager.finishApplicationMaster(finishReq)
- }
- isFinished = true
- }
- }
-
-}
-
-
-object ExecutorLauncher {
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ExecutorLauncher(args).run()
- }
- }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 568a6ef932..629cd13f67 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,33 +17,24 @@
package org.apache.spark.deploy.yarn
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.AMRMProtocol
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
+import org.apache.hadoop.yarn.util.Records
// TODO:
// Too many params.
@@ -59,16 +50,14 @@ object AllocationType extends Enumeration {
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
private[yarn] class YarnAllocationHandler(
- val conf: Configuration,
- val resourceManager: AMRMProtocol,
- val appAttemptId: ApplicationAttemptId,
- val maxExecutors: Int,
- val executorMemory: Int,
- val executorCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int],
- val sparkConf: SparkConf)
- extends Logging {
+ conf: Configuration,
+ sparkConf: SparkConf,
+ resourceManager: AMRMProtocol,
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments,
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+ extends YarnAllocator with Logging {
+
// These three are locked on allocatedHostToContainersMap. Complementary data structures
// allocatedHostToContainersMap : containers which are running : host, Set<containerid>
// allocatedContainerToHostMap: container to host mapping.
@@ -90,7 +79,7 @@ private[yarn] class YarnAllocationHandler(
// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
@@ -98,6 +87,12 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
+ private val maxExecutors = args.numExecutors
+ private val executorMemory = args.executorMemory
+ private val executorCores = args.executorCores
+ private val (preferredHostToCount, preferredRackToCount) =
+ generateNodeToWeight(conf, preferredNodes)
+
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -106,9 +101,10 @@ private[yarn] class YarnAllocationHandler(
container.getResource.getMemory >= (executorMemory + memoryOverhead)
}
- def allocateContainers(executorsToRequest: Int) {
+ override def allocateResources() = {
// We need to send the request only once from what I understand ... but for now, not modifying
// this much.
+ val executorsToRequest = Math.max(maxExecutors - numExecutorsRunning.get(), 0)
// Keep polling the Resource Manager for containers
val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
@@ -182,7 +178,7 @@ private[yarn] class YarnAllocationHandler(
// Now rack local
if (remainingContainers != null){
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null){
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
@@ -256,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
// Should not be there, but ..
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
@@ -331,7 +327,7 @@ private[yarn] class YarnAllocationHandler(
allocatedContainerToHostMap -= containerId
// Doing this within locked context, sigh ... move to outside ?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
if (rackCount > 0) {
@@ -364,9 +360,9 @@ private[yarn] class YarnAllocationHandler(
for (container <- hostContainers) {
val candidateHost = container.getHostName
val candidateNumContainers = container.getNumContainers
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+ assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null) {
var count = rackToCounts.getOrElse(rack, 0)
count += candidateNumContainers
@@ -378,7 +374,8 @@ private[yarn] class YarnAllocationHandler(
new ArrayBuffer[ResourceRequest](rackToCounts.size)
for ((rack, count) <- rackToCounts){
requestedContainers +=
- createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
+ createResourceRequest(AllocationType.RACK, rack, count,
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
requestedContainers.toList
@@ -409,7 +406,7 @@ private[yarn] class YarnAllocationHandler(
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
- AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
+ AllocationType.ANY, null, numExecutors, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
@@ -423,7 +420,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.HOST,
candidateHost,
requiredCount,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
}
val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
@@ -433,7 +430,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.ANY,
resource = null,
numExecutors,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
hostContainerRequests.size + rackContainerRequests.size + 1)
@@ -483,12 +480,12 @@ private[yarn] class YarnAllocationHandler(
// There must be a third request - which is ANY : that will be specially handled.
requestType match {
case AllocationType.HOST => {
- assert(YarnAllocationHandler.ANY_HOST != resource)
+ assert(YarnSparkHadoopUtil.ANY_HOST != resource)
val hostname = resource
val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
// Add to host->rack mapping
- YarnAllocationHandler.populateRackInfo(conf, hostname)
+ YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
nodeLocal
}
@@ -497,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
createResourceRequestImpl(rack, numExecutors, priority)
}
case AllocationType.ANY => createResourceRequestImpl(
- YarnAllocationHandler.ANY_HOST, numExecutors, priority)
+ YarnSparkHadoopUtil.ANY_HOST, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -541,90 +538,6 @@ private[yarn] class YarnAllocationHandler(
retval
}
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb
- val MEMORY_OVERHEAD = 384
-
- // Host to rack map - saved from allocation requests
- // We are expecting this not to change.
- // Note that it is possible for this to change : and RM will indicate that to us via update
- // response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- Map[String, Int](),
- Map[String, Int](),
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String,
- collection.Set[SplitInfo]],
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- maxExecutors: Int,
- executorMemory: Int,
- executorCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]],
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- maxExecutors,
- executorMemory,
- executorCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
// A simple method to copy the split info map.
private def generateNodeToWeight(
@@ -642,7 +555,7 @@ object YarnAllocationHandler {
val hostCount = hostToCount.getOrElse(host, 0)
hostToCount.put(host, hostCount + splits.size)
- val rack = lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null){
val rackCount = rackToCount.getOrElse(host, 0)
rackToCount.put(host, rackCount + splits.size)
@@ -652,41 +565,4 @@ object YarnAllocationHandler {
(hostToCount.toMap, rackToCount.toMap)
}
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) populateRackInfo(conf, host)
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- val set = rackToHostSet.get(rack)
- if (set == null) return None
-
- // No better way to get a Set[String] from JSet ?
- val convertedSet: collection.mutable.Set[String] = set
- Some(convertedSet.toSet)
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list ?
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out this comment...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000000..cc5392192e
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+/**
+ * YarnRMClient implementation for the Yarn alpha API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+ private var rpc: YarnRPC = null
+ private var resourceManager: AMRMProtocol = _
+ private var uiHistoryAddress: String = _
+
+ override def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String) = {
+ this.rpc = YarnRPC.create(conf)
+ this.uiHistoryAddress = uiHistoryAddress
+
+ resourceManager = registerWithResourceManager(conf)
+ registerApplicationMaster(uiAddress)
+
+ new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
+ preferredNodeLocations)
+ }
+
+ override def getAttemptId() = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ appAttemptId
+ }
+
+ override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(getAttemptId())
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ finishReq.setTrackingUrl(uiHistoryAddress)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+ override def getProxyHostAndPort(conf: YarnConfiguration) =
+ YarnConfiguration.getProxyHostAndPort(conf)
+
+ override def getMaxRegAttempts(conf: YarnConfiguration) =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+
+ private def registerWithResourceManager(conf: YarnConfiguration): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(uiAddress: String): RegisterApplicationMasterResponse = {
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(getAttemptId())
+ // Setting this to master host,port - so that the ApplicationReport at client has some
+ // sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ appMasterRequest.setTrackingUrl(uiAddress)
+ resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000000..f15c813b83
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import akka.actor._
+import akka.remote._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+
+/**
+ * Common application master functionality for Spark on Yarn.
+ */
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
+ client: YarnRMClient) extends Logging {
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+ // optimal as more containers are available. Might need to handle this better.
+ private val ALLOCATE_HEARTBEAT_INTERVAL = 100
+
+ private val sparkConf = new SparkConf()
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
+ private val isDriver = args.userClass != null
+
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+
+ @volatile private var finished = false
+ @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+
+ private var reporterThread: Thread = _
+ private var allocator: YarnAllocator = _
+
+ // Fields used in client mode.
+ private var actorSystem: ActorSystem = null
+ private var actor: ActorRef = _
+
+ // Fields used in cluster mode.
+ private val sparkContextRef = new AtomicReference[SparkContext](null)
+
+ final def run(): Int = {
+ if (isDriver) {
+ // Set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
+
+ // Set the master property to match the requested mode.
+ System.setProperty("spark.master", "yarn-cluster")
+ }
+
+ logInfo("ApplicationAttemptId: " + client.getAttemptId())
+
+ val cleanupHook = new Runnable {
+ override def run() {
+ // If the SparkContext is still registered, shut it down as a best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ finish(FinalApplicationStatus.SUCCEEDED)
+ }
+
+ // Cleanup the staging dir after the app is finished, or if it's the last attempt at
+ // running the AM.
+ val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+ if (finished || isLastAttempt) {
+ cleanupStagingDir()
+ }
+ }
+ }
+ // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
+ ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
+
+ // Call this to force generation of secret so it gets populated into the
+ // Hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the executor containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
+ if (isDriver) {
+ runDriver()
+ } else {
+ runExecutorLauncher(securityMgr)
+ }
+
+ if (finalStatus != FinalApplicationStatus.UNDEFINED) {
+ finish(finalStatus)
+ 0
+ } else {
+ 1
+ }
+ }
+
+ final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+ if (!finished) {
+ logInfo(s"Finishing ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+ finished = true
+ finalStatus = status
+ try {
+ if (Thread.currentThread() != reporterThread) {
+ reporterThread.interrupt()
+ reporterThread.join()
+ }
+ } finally {
+ client.shutdown(status, Option(diagnostics).getOrElse(""))
+ }
+ }
+ }
+
+ private def sparkContextInitialized(sc: SparkContext) = {
+ sparkContextRef.synchronized {
+ sparkContextRef.compareAndSet(null, sc)
+ sparkContextRef.notifyAll()
+ }
+ }
+
+ private def sparkContextStopped(sc: SparkContext) = {
+ sparkContextRef.compareAndSet(sc, null)
+ }
+
+ private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+ val sc = sparkContextRef.get()
+ allocator = client.register(yarnConf,
+ if (sc != null) sc.getConf else sparkConf,
+ if (sc != null) sc.preferredNodeLocationData else Map(),
+ uiAddress,
+ uiHistoryAddress)
+
+ allocator.allocateResources()
+ reporterThread = launchReporterThread()
+ }
+
+ private def runDriver(): Unit = {
+ addAmIpFilter()
+ val userThread = startUserClass()
+
+ // This a bit hacky, but we need to wait until the spark.driver.port property has
+ // been set by the Thread executing the user class.
+ val sc = waitForSparkContextInitialized()
+
+ // If there is no SparkContext at this point, just fail the app.
+ if (sc == null) {
+ finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
+ } else {
+ registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+ try {
+ userThread.join()
+ } finally {
+ // In cluster mode, ask the reporter thread to stop since the user app is finished.
+ reporterThread.interrupt()
+ }
+ }
+ }
+
+ private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+ actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ conf = sparkConf, securityManager = securityMgr)._1
+ actor = waitForSparkDriver()
+ addAmIpFilter()
+ registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
+ sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+
+ // In client mode the actor will stop the reporter thread.
+ reporterThread.join()
+ finalStatus = FinalApplicationStatus.SUCCEEDED
+ }
+
+ private def launchReporterThread(): Thread = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+ // must be <= expiryInterval / 2.
+ val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+
+ val t = new Thread {
+ override def run() {
+ while (!finished) {
+ checkNumExecutorsFailed()
+ if (!finished) {
+ logDebug("Sending progress")
+ allocator.allocateResources()
+ try {
+ Thread.sleep(interval)
+ } catch {
+ case e: InterruptedException =>
+ }
+ }
+ }
+ }
+ }
+ // setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.setName("Reporter")
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + interval)
+ t
+ }
+
+ /**
+ * Clean up the staging directory.
+ */
+ private def cleanupStagingDir() {
+ val fs = FileSystem.get(yarnConf)
+ var stagingDirPath: Path = null
+ try {
+ val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+ if (!preserveFiles) {
+ stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+ if (stagingDirPath == null) {
+ logError("Staging directory is null")
+ return
+ }
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case ioe: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ private def waitForSparkContextInitialized(): SparkContext = {
+ logInfo("Waiting for spark context initialization")
+ try {
+ sparkContextRef.synchronized {
+ var count = 0
+ val waitTime = 10000L
+ val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+ while (sparkContextRef.get() == null && count < numTries && !finished) {
+ logInfo("Waiting for spark context initialization ... " + count)
+ count = count + 1
+ sparkContextRef.wait(waitTime)
+ }
+
+ val sparkContext = sparkContextRef.get()
+ assert(sparkContext != null || count >= numTries)
+ if (sparkContext == null) {
+ logError(
+ "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format(
+ count * waitTime, numTries))
+ }
+ sparkContext
+ }
+ }
+ }
+
+ private def waitForSparkDriver(): ActorRef = {
+ logInfo("Waiting for Spark driver to be reachable.")
+ var driverUp = false
+ val hostport = args.userArgs(0)
+ val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+ while (!driverUp) {
+ try {
+ val socket = new Socket(driverHost, driverPort)
+ socket.close()
+ logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+ driverUp = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to driver at %s:%s, retrying ...".
+ format(driverHost, driverPort))
+ Thread.sleep(100)
+ }
+ }
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
+
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ driverHost,
+ driverPort.toString,
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ }
+
+ private def checkNumExecutorsFailed() = {
+ if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
+
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from checkNumExecutorsFailed")
+ sc.stop()
+ }
+ }
+ }
+
+ /** Add the Yarn IP filter that is required for properly securing the UI. */
+ private def addAmIpFilter() = {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ val proxy = client.getProxyHostAndPort(yarnConf)
+ val parts = proxy.split(":")
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val uriBase = "http://" + proxy + proxyBase
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+
+ if (isDriver) {
+ System.setProperty("spark.ui.filters", amFilter)
+ System.setProperty(s"spark.$amFilter.params", params)
+ } else {
+ actor ! AddWebUIFilter(amFilter, params, proxyBase)
+ }
+ }
+
+ private def startUserClass(): Thread = {
+ logInfo("Starting the user JAR in a separate Thread")
+ System.setProperty("spark.executor.instances", args.numExecutors.toString)
+ val mainMethod = Class.forName(args.userClass, false,
+ Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+
+ val t = new Thread {
+ override def run() {
+ var status = FinalApplicationStatus.FAILED
+ try {
+ // Copy
+ val mainArgs = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+ mainMethod.invoke(null, mainArgs)
+ // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
+ // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
+ status = FinalApplicationStatus.SUCCEEDED
+ } finally {
+ logDebug("Finishing main")
+ }
+ finalStatus = status
+ }
+ }
+ t.setName("Driver")
+ t.start()
+ t
+ }
+
+ // Actor used to monitor the driver when running in client deploy mode.
+ private class MonitorActor(driverUrl: String) extends Actor {
+
+ var driver: ActorSelection = _
+
+ override def preStart() = {
+ logInfo("Listen to driver: " + driverUrl)
+ driver = context.actorSelection(driverUrl)
+ // Send a hello message to establish the connection, after which
+ // we can monitor Lifecycle Events.
+ driver ! "Hello"
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ }
+
+ override def receive = {
+ case x: DisassociatedEvent =>
+ logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+ finish(FinalApplicationStatus.SUCCEEDED)
+ case x: AddWebUIFilter =>
+ logInfo(s"Add WebUI Filter. $x")
+ driver ! x
+ }
+
+ }
+
+}
+
+object ApplicationMaster extends Logging {
+
+ private var master: ApplicationMaster = _
+
+ def main(args: Array[String]) = {
+ SignalLogger.register(log)
+ val amArgs = new ApplicationMasterArguments(args)
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
+ System.exit(master.run())
+ }
+ }
+
+ private[spark] def sparkContextInitialized(sc: SparkContext) = {
+ master.sparkContextInitialized(sc)
+ }
+
+ private[spark] def sparkContextStopped(sc: SparkContext) = {
+ master.sparkContextStopped(sc)
+ }
+
+}
+
+/**
+ * This object does not provide any special functionality. It exists so that it's easy to tell
+ * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
+ */
+object ExecutorLauncher {
+
+ def main(args: Array[String]) = {
+ ApplicationMaster.main(args)
+ }
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 424b0fb093..3e6b96fb63 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -63,11 +63,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
executorCores = value
args = tail
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
case _ =>
printUsageAndExit(1, args)
}
@@ -80,16 +75,17 @@ class ApplicationMasterArguments(val args: Array[String]) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
- System.err.println(
- "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-executors NUM Number of executors to start (Default: 2)\n" +
- " --executor-cores NUM Number of cores for the executors (Default: 1)\n" +
- " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
+ System.err.println("""
+ |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
+ |Options:
+ | --jar JAR_PATH Path to your application's JAR file
+ | --class CLASS_NAME Name of your application's main class
+ | --args ARGS Arguments to be passed to your application's main class.
+ | Mutliple invocations are possible, each will be passed in order.
+ | --num-executors NUM Number of executors to start (Default: 2)
+ | --executor-cores NUM Number of cores for the executors (Default: 1)
+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+ """.stripMargin)
System.exit(exitCode)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index afa4fd4c69..40d8d6d6e6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -37,7 +37,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
- var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
var priority = 0
@@ -78,10 +77,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
args = tail
case ("--master-class" | "--am-class") :: value :: tail =>
- if (args(0) == "--master-class") {
- println("--master-class is deprecated. Use --am-class instead.")
- }
- amClass = value
+ println(s"${args(0)} is deprecated and is not used anymore.")
args = tail
case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
@@ -133,9 +129,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
args = tail
case Nil =>
- if (userClass == null) {
- throw new IllegalArgumentException(getUsageMessage())
- }
case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 3897b3a373..6cf300c398 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -42,12 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
* Client submits an application to the YARN ResourceManager.
- *
- * Depending on the deployment mode this will launch one of two application master classes:
- * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
- * which launches a driver program inside of the cluster.
- * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
- * request executors on behalf of a driver running outside of the cluster.
*/
trait ClientBase extends Logging {
val args: ClientArguments
@@ -67,14 +61,11 @@ trait ClientBase extends Logging {
// Additional memory overhead - in mb.
protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
// TODO(harvey): This could just go in ClientArguments.
def validateArgs() = {
Map(
- ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
- "Error: You must specify a user jar when running in standalone mode!"),
- (args.userClass == null) -> "Error: You must specify a user class!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + memoryOverhead),
@@ -321,6 +312,8 @@ trait ClientBase extends Logging {
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
+ val isLaunchingDriver = args.userClass != null
+
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
@@ -329,7 +322,7 @@ trait ClientBase extends Logging {
// Note that to warn the user about the deprecation in cluster mode, some code from
// SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
// described above).
- if (args.amClass == classOf[ApplicationMaster].getName) {
+ if (isLaunchingDriver) {
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
s"""
@@ -389,7 +382,7 @@ trait ClientBase extends Logging {
javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
}
- if (args.amClass == classOf[ApplicationMaster].getName) {
+ if (isLaunchingDriver) {
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
.foreach(opts => javaOpts += opts)
@@ -397,22 +390,37 @@ trait ClientBase extends Logging {
.foreach(p => javaOpts += s"-Djava.library.path=$p")
}
- // Command for the ApplicationMaster
- val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
- javaOpts ++
- Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass),
- "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
- userArgsToString(args),
- "--executor-memory", args.executorMemory.toString,
+ val userClass =
+ if (args.userClass != null) {
+ Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
+ } else {
+ Nil
+ }
+ val amClass =
+ if (isLaunchingDriver) {
+ classOf[ApplicationMaster].getName()
+ } else {
+ classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
+ }
+ val amArgs =
+ Seq(amClass) ++ userClass ++
+ (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
+ Seq("--executor-memory", args.executorMemory.toString,
"--executor-cores", args.executorCores.toString,
"--num-executors ", args.numExecutors.toString,
+ userArgsToString(args))
+
+ // Command for the ApplicationMaster
+ val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+ javaOpts ++ amArgs ++
+ Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
logInfo("Yarn AM launch context:")
- logInfo(s" class: ${args.amClass}")
- logInfo(s" env: $env")
- logInfo(s" command: ${commands.mkString(" ")}")
+ logInfo(s" user class: ${args.userClass}")
+ logInfo(s" env: $env")
+ logInfo(s" command: ${commands.mkString(" ")}")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
@@ -623,7 +631,7 @@ object ClientBase extends Logging {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
File.pathSeparator)
- /**
+ /**
* Get the list of namenodes the user may access.
*/
private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
new file mode 100644
index 0000000000..cad94e5e19
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+object AllocationType extends Enumeration {
+ type AllocationType = Value
+ val HOST, RACK, ANY = Value
+}
+
+/**
+ * Interface that defines a Yarn allocator.
+ */
+trait YarnAllocator {
+
+ def allocateResources(): Unit
+ def getNumExecutorsFailed: Int
+ def getNumExecutorsRunning: Int
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
new file mode 100644
index 0000000000..922d7d1a85
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.SplitInfo
+
+/**
+ * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
+ * is used by Spark's AM.
+ */
+trait YarnRMClient {
+
+ /**
+ * Registers the application master with the RM.
+ *
+ * @param conf The Yarn configuration.
+ * @param sparkConf The Spark configuration.
+ * @param preferredNodeLocations Map with hints about where to allocate containers.
+ * @param uiAddress Address of the SparkUI.
+ * @param uiHistoryAddress Address of the application on the History Server.
+ */
+ def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String): YarnAllocator
+
+ /**
+ * Shuts down the AM. Guaranteed to only be called once.
+ *
+ * @param status The final status of the AM.
+ * @param diagnostics Diagnostics message to include in the final status.
+ */
+ def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+
+ /** Returns the attempt ID. */
+ def getAttemptId(): ApplicationAttemptId
+
+ /** Returns the RM's proxy host and port. */
+ def getProxyHostAndPort(conf: YarnConfiguration): String
+
+ /** Returns the maximum number of attempts to register the AM. */
+ def getMaxRegAttempts(conf: YarnConfiguration): Int
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 10aef5eb24..2aa27a1908 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,8 +17,11 @@
package org.apache.spark.deploy.yarn
+import java.lang.{Boolean => JBoolean}
+import java.util.{Collections, Set => JSet}
import java.util.regex.Matcher
import java.util.regex.Pattern
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.HashMap
@@ -29,11 +32,13 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
/**
* Contains util methods to interact with Hadoop from spark.
@@ -79,6 +84,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
+ // Additional memory overhead - in mb.
+ val DEFAULT_MEMORY_OVERHEAD = 384
+
+ val ANY_HOST = "*"
+
+ // All RM requests are issued with same priority : we do not (yet) have any distinction between
+ // request types (like map/reduce in hadoop for example)
+ val RM_REQUEST_PRIORITY = 1
+
+ // Host to rack map - saved from allocation requests. We are expecting this not to change.
+ // Note that it is possible for this to change : and ResourceManager will indicate that to us via
+ // update response to allocate. But we are punting on handling that for now.
+ private val hostToRack = new ConcurrentHashMap[String, String]()
+ private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
def addToEnvironment(
env: HashMap[String, String],
variable: String,
@@ -173,4 +193,35 @@ object YarnSparkHadoopUtil {
}
}
+ private[spark] def lookupRack(conf: Configuration, host: String): String = {
+ if (!hostToRack.contains(host)) {
+ populateRackInfo(conf, host)
+ }
+ hostToRack.get(host)
+ }
+
+ private[spark] def populateRackInfo(conf: Configuration, hostname: String) {
+ Utils.checkHost(hostname)
+
+ if (!hostToRack.containsKey(hostname)) {
+ // If there are repeated failures to resolve, all to an ignore list.
+ val rackInfo = RackResolver.resolve(conf, hostname)
+ if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+ val rack = rackInfo.getNetworkLocation
+ hostToRack.put(hostname, rack)
+ if (! rackToHostSet.containsKey(rack)) {
+ rackToHostSet.putIfAbsent(rack,
+ Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+ }
+ rackToHostSet.get(rack).add(hostname)
+
+ // TODO(harvey): Figure out what this comment means...
+ // Since RackResolver caches, we are disabling this for now ...
+ } /* else {
+ // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+ hostToRack.put(hostname, null)
+ } */
+ }
+ }
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 3474112ded..d162b4c433 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -19,22 +19,21 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
/**
- *
- * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration)
+ extends TaskSchedulerImpl(sc) {
def this(sc: SparkContext) = this(sc, new Configuration())
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
+ Option(YarnSparkHadoopUtil.lookupRack(conf, host))
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 833e249f9f..a5f537dd9d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import scala.collection.mutable.ArrayBuffer
@@ -60,10 +60,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
- "--class", "notused",
- "--jar", null, // The primary jar will be added dynamically in SparkContext.
- "--args", hostport,
- "--am-class", classOf[ExecutorLauncher].getName
+ "--args", hostport
)
// process any optional arguments, given either as environment variables
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 9aeca4a637..69f40225a2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -18,16 +18,17 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
+ * ApplicationMaster, etc is done
*/
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
+ extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")
@@ -42,7 +43,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
+ val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}
@@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
+
+ override def stop() {
+ super.stop()
+ ApplicationMaster.sparkContextStopped(sc)
+ }
+
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 1c4005fd8e..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.{SignalLogger, Utils}
-
-
-/**
- * An application master that runs the user's driver program and allocates executors.
- */
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
- sparkConf: SparkConf) extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- private var appAttemptId: ApplicationAttemptId = _
- private var userThread: Thread = _
- private val fs = FileSystem.get(yarnConf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var isFinished: Boolean = false
- private var uiAddress: String = _
- private var uiHistoryAddress: String = _
- private val maxAppAttempts: Int = conf.getInt(
- YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
- private var isLastAMRetry: Boolean = true
- private var amClient: AMRMClient[ContainerRequest] = _
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- private var registered = false
-
- def run() {
- // Set the web ui port to be ephemeral for yarn so we don't conflict with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
-
- // When running the AM, the Spark master is always "yarn-cluster"
- System.setProperty("spark.master", "yarn-cluster")
-
- // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
- ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
- appAttemptId = ApplicationMaster.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
- amClient = AMRMClient.createAMRMClient()
- amClient.init(yarnConf)
- amClient.start()
-
- // setup AmIpFilter for the SparkUI - do this before we start the UI
- addAmIpFilter()
-
- ApplicationMaster.register(this)
-
- // Call this to force generation of secret so it gets populated into the
- // Hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the executor containers.
- val securityMgr = new SecurityManager(sparkConf)
-
- // Start the user's JAR
- userThread = startUserClass()
-
- // This a bit hacky, but we need to wait until the spark.driver.port property has
- // been set by the Thread executing the user class.
- waitForSparkContextInitialized()
-
- // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
- synchronized {
- if (!isFinished) {
- registerApplicationMaster()
- registered = true
- }
- }
-
- // Allocate all containers
- allocateExecutors()
-
- // Launch thread that will heartbeat to the RM so it won't think the app has died.
- launchReporterThread()
-
- // Wait for the user class to finish
- userThread.join()
-
- System.exit(0)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- System.setProperty("spark.ui.filters", amFilter)
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts : Array[String] = proxy.split(":")
- val uriBase = "http://" + proxy +
- System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty(
- "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
- }
-
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
- System.setProperty("spark.executor.instances", args.numExecutors.toString)
- val mainMethod = Class.forName(
- args.userClass,
- false,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
- override def run() {
- var succeeded = false
- try {
- // Copy
- val mainArgs = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
- // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
- // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
- succeeded = true
- } finally {
- logDebug("Finishing main")
- isLastAMRetry = true
- if (succeeded) {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } else {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- }
- }
- }
- }
- t.setName("Driver")
- t.start()
- t
- }
-
- // This needs to happen before allocateExecutors()
- private def waitForSparkContextInitialized() {
- logInfo("Waiting for Spark context initialization")
- try {
- var sparkContext: SparkContext = null
- ApplicationMaster.sparkContextRef.synchronized {
- var numTries = 0
- val waitTime = 10000L
- val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
- && !isFinished) {
- logInfo("Waiting for Spark context initialization ... " + numTries)
- numTries = numTries + 1
- ApplicationMaster.sparkContextRef.wait(waitTime)
- }
- sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null || numTries >= maxNumTries)
-
- if (sparkContext != null) {
- uiAddress = sparkContext.ui.appUIHostPort
- uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- amClient,
- appAttemptId,
- args,
- sparkContext.preferredNodeLocationData,
- sparkContext.getConf)
- } else {
- logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d".
- format(numTries * waitTime, maxNumTries))
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- amClient,
- appAttemptId,
- args,
- sparkContext.getConf)
- }
- }
- }
- }
-
- private def allocateExecutors() {
- try {
- logInfo("Requesting" + args.numExecutors + " executors.")
- // Wait until all containers have launched
- yarnAllocator.addResourceRequests(args.numExecutors)
- yarnAllocator.allocateResources()
- // Exits the loop if the user thread exits.
-
- while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
- && !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- yarnAllocator.allocateResources()
- Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
- }
- }
- logInfo("All executors have launched.")
- }
-
- private def allocateMissingExecutor() {
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
- }
-
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- logInfo("max number of executor failures reached")
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- // make sure to stop the user thread
- val sparkContext = ApplicationMaster.sparkContextRef.get()
- if (sparkContext != null) {
- logInfo("Invoking sc stop from checkNumExecutorsFailed")
- sparkContext.stop()
- } else {
- logError("sparkContext is null when should shutdown")
- }
- }
- }
-
- private def launchReporterThread(): Thread = {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
- // must be <= timeoutInterval / 2.
- val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
-
- val t = new Thread {
- override def run() {
- while (userThread.isAlive && !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- logDebug("Sending progress")
- yarnAllocator.allocateResources()
- Thread.sleep(interval)
- }
- }
- }
- // Setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - heartbeat interval : " + interval)
- t
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- isFinished = true
-
- logInfo("Unregistering ApplicationMaster with " + status)
- if (registered) {
- amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
- }
- }
- }
-
- /**
- * Clean up the staging directory.
- */
- private def cleanupStagingDir() {
- var stagingDirPath: Path = null
- try {
- val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
- if (!preserveFiles) {
- stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
- if (stagingDirPath == null) {
- logError("Staging directory is null")
- return
- }
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
- }
- } catch {
- case ioe: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
- }
- }
-
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
- class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
- def run() {
- logInfo("AppMaster received a signal.")
- // We need to clean up staging dir before HDFS is shut down
- // make sure we don't delete it until this is the last AM
- if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
- }
- }
-
-}
-
-object ApplicationMaster extends Logging {
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
- // optimal as more containers are available. Might need to handle this better.
- private val ALLOCATE_HEARTBEAT_INTERVAL = 100
-
- private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
- val sparkContextRef: AtomicReference[SparkContext] =
- new AtomicReference[SparkContext](null)
-
- def register(master: ApplicationMaster) {
- applicationMasters.add(master)
- }
-
- /**
- * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been
- * initialized in the user code.
- */
- def sparkContextInitialized(sc: SparkContext): Boolean = {
- var modified = false
- sparkContextRef.synchronized {
- modified = sparkContextRef.compareAndSet(null, sc)
- sparkContextRef.notifyAll()
- }
-
- // Add a shutdown hook - as a best effort in case users do not call sc.stop or do
- // System.exit.
- // Should not really have to do this, but it helps YARN to evict resources earlier.
- // Not to mention, prevent the Client from declaring failure even though we exited properly.
- // Note that this will unfortunately not properly clean up the staging files because it gets
- // called too late, after the filesystem is already shutdown.
- if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not only logs, but also ensures that log system is initialized for this instance
- // when we are actually 'run'-ing.
- logInfo("Adding shutdown hook for context " + sc)
-
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- // Best case ...
- for (master <- applicationMasters) {
- master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- }
- }
- })
- }
-
- // Wait for initialization to complete and at least 'some' nodes to get allocated.
- modified
- }
-
- def getApplicationAttemptId(): ApplicationAttemptId = {
- val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- appAttemptId
- }
-
- def main(argStrings: Array[String]) {
- SignalLogger.register(log)
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ApplicationMaster(args).run()
- }
- }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
deleted file mode 100644
index e093fe4ae6..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import akka.actor._
-import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-/**
- * An application master that allocates executors on behalf of a driver that is running outside
- * the cluster.
- *
- * This is used only in yarn-client mode.
- */
-class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
- extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private var appAttemptId: ApplicationAttemptId = _
- private var reporterThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var driverClosed: Boolean = false
- private var isFinished: Boolean = false
- private var registered: Boolean = false
-
- private var amClient: AMRMClient[ContainerRequest] = _
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- val securityManager = new SecurityManager(sparkConf)
- val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf, securityManager = securityManager)._1
- var actor: ActorRef = _
-
- // This actor just working as a monitor to watch on Driver Actor.
- class MonitorActor(driverUrl: String) extends Actor {
-
- var driver: ActorSelection = _
-
- override def preStart() {
- logInfo("Listen to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- // Send a hello message to establish the connection, after which
- // we can monitor Lifecycle Events.
- driver ! "Hello"
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- }
-
- override def receive = {
- case x: DisassociatedEvent =>
- logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- driverClosed = true
- case x: AddWebUIFilter =>
- logInfo(s"Add WebUI Filter. $x")
- driver ! x
- }
- }
-
- def run() {
- amClient = AMRMClient.createAMRMClient()
- amClient.init(yarnConf)
- amClient.start()
-
- appAttemptId = ApplicationMaster.getApplicationAttemptId()
- synchronized {
- if (!isFinished) {
- registerApplicationMaster()
- registered = true
- }
- }
-
- waitForSparkMaster()
- addAmIpFilter()
-
- // Allocate all containers
- allocateExecutors()
-
- // Launch a progress reporter thread, else app will get killed after expiration
- // (def: 10mins) timeout ensure that progress is sent before
- // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- reporterThread = launchReporterThread(interval)
-
-
- // Wait for the reporter thread to Finish.
- reporterThread.join()
-
- finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- actorSystem.shutdown()
-
- logInfo("Exited")
- System.exit(0)
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
- logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
- amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
- }
-
- private def waitForSparkMaster() {
- logInfo("Waiting for Spark driver to be reachable.")
- var driverUp = false
- val hostport = args.userArgs(0)
- val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- while(!driverUp) {
- try {
- val socket = new Socket(driverHost, driverPort)
- socket.close()
- logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at %s:%s, retrying ...".
- format(driverHost, driverPort))
- Thread.sleep(100)
- }
- }
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
-
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- driverHost,
- driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
- }
-
-
- private def allocateExecutors() {
- // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map()
-
- yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- amClient,
- appAttemptId,
- args,
- preferredNodeLocationData,
- sparkConf)
-
- logInfo("Requesting " + args.numExecutors + " executors.")
- // Wait until all containers have launched
- yarnAllocator.addResourceRequests(args.numExecutors)
- yarnAllocator.allocateResources()
- while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
- !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- yarnAllocator.allocateResources()
- Thread.sleep(100)
- }
-
- logInfo("All executors have launched.")
- }
-
- private def allocateMissingExecutor() {
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
- }
-
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
- }
-
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (!driverClosed && !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- logDebug("Sending progress")
- yarnAllocator.allocateResources()
- Thread.sleep(sleepTime)
- }
- }
- }
- // setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- logInfo("Unregistering ApplicationMaster with " + status)
- if (registered) {
- val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
- amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
- }
- isFinished = true
- }
- }
-
-}
-
-object ExecutorLauncher {
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ExecutorLauncher(args).run()
- }
- }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 0a461749c8..4d51449899 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,12 +17,9 @@
package org.apache.spark.deploy.yarn
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -32,20 +29,13 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
+import org.apache.hadoop.yarn.util.Records
// TODO:
// Too many params.
@@ -61,16 +51,14 @@ object AllocationType extends Enumeration {
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
private[yarn] class YarnAllocationHandler(
- val conf: Configuration,
- val amClient: AMRMClient[ContainerRequest],
- val appAttemptId: ApplicationAttemptId,
- val maxExecutors: Int,
- val executorMemory: Int,
- val executorCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int],
- val sparkConf: SparkConf)
- extends Logging {
+ conf: Configuration,
+ sparkConf: SparkConf,
+ amClient: AMRMClient[ContainerRequest],
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments,
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+ extends YarnAllocator with Logging {
+
// These three are locked on allocatedHostToContainersMap. Complementary data structures
// allocatedHostToContainersMap : containers which are running : host, Set<containerid>
// allocatedContainerToHostMap: container to host mapping.
@@ -92,7 +80,7 @@ private[yarn] class YarnAllocationHandler(
// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
@@ -103,11 +91,15 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
- def getNumPendingAllocate: Int = numPendingAllocate.intValue
+ private val maxExecutors = args.numExecutors
+ private val executorMemory = args.executorMemory
+ private val executorCores = args.executorCores
+ private val (preferredHostToCount, preferredRackToCount) =
+ generateNodeToWeight(conf, preferredNodes)
- def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+ override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+ override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (executorMemory + memoryOverhead)
@@ -119,7 +111,9 @@ private[yarn] class YarnAllocationHandler(
amClient.releaseAssignedContainer(containerId)
}
- def allocateResources() {
+ override def allocateResources() = {
+ addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
+
// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
val progressIndicator = 0.1f
@@ -204,7 +198,7 @@ private[yarn] class YarnAllocationHandler(
// For rack local containers
if (remainingContainers != null) {
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null) {
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
@@ -273,7 +267,7 @@ private[yarn] class YarnAllocationHandler(
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
@@ -360,7 +354,7 @@ private[yarn] class YarnAllocationHandler(
allocatedContainerToHostMap.remove(containerId)
// TODO: Move this part outside the synchronized block?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
if (rackCount > 0) {
@@ -393,9 +387,9 @@ private[yarn] class YarnAllocationHandler(
for (container <- hostContainers) {
val candidateHost = container.getNodes.last
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+ assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null) {
var count = rackToCounts.getOrElse(rack, 0)
count += 1
@@ -409,7 +403,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.RACK,
rack,
count,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
requestedContainers
@@ -431,7 +425,7 @@ private[yarn] class YarnAllocationHandler(
retval
}
- def addResourceRequests(numExecutors: Int) {
+ private def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
@@ -440,9 +434,9 @@ private[yarn] class YarnAllocationHandler(
AllocationType.ANY,
resource = null,
numExecutors,
- YarnAllocationHandler.PRIORITY).toList
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
} else {
- // Request for all hosts in preferred nodes and for numExecutors -
+ // Request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -453,7 +447,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.HOST,
candidateHost,
requiredCount,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
}
val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
@@ -463,7 +457,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.ANY,
resource = null,
numExecutors,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
@@ -512,7 +506,7 @@ private[yarn] class YarnAllocationHandler(
// There must be a third request, which is ANY. That will be specially handled.
requestType match {
case AllocationType.HOST => {
- assert(YarnAllocationHandler.ANY_HOST != resource)
+ assert(YarnSparkHadoopUtil.ANY_HOST != resource)
val hostname = resource
val nodeLocal = constructContainerRequests(
Array(hostname),
@@ -521,7 +515,7 @@ private[yarn] class YarnAllocationHandler(
priority)
// Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnAllocationHandler.populateRackInfo(conf, hostname)
+ YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
nodeLocal
}
case AllocationType.RACK => {
@@ -554,88 +548,6 @@ private[yarn] class YarnAllocationHandler(
}
requests
}
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb.
- val MEMORY_OVERHEAD = 384
-
- // Host to rack map - saved from allocation requests. We are expecting this not to change.
- // Note that it is possible for this to change : and ResurceManager will indicate that to us via
- // update response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- Map[String, Int](),
- Map[String, Int](),
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String,
- collection.Set[SplitInfo]],
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- hostToSplitCount,
- rackToSplitCount,
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- maxExecutors: Int,
- executorMemory: Int,
- executorCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]],
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- maxExecutors,
- executorMemory,
- executorCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
// A simple method to copy the split info map.
private def generateNodeToWeight(
@@ -654,7 +566,7 @@ object YarnAllocationHandler {
val hostCount = hostToCount.getOrElse(host, 0)
hostToCount.put(host, hostCount + splits.size)
- val rack = lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null){
val rackCount = rackToCount.getOrElse(host, 0)
rackToCount.put(host, rackCount + splits.size)
@@ -664,42 +576,4 @@ object YarnAllocationHandler {
(hostToCount.toMap, rackToCount.toMap)
}
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) {
- populateRackInfo(conf, host)
- }
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- Option(rackToHostSet.get(rack)).map { set =>
- val convertedSet: collection.mutable.Set[String] = set
- // TODO: Better way to get a Set[String] from JSet.
- convertedSet.toSet
- }
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list.
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out what this comment means...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000000..e8b8d9bc72
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+
+/**
+ * YarnRMClient implementation for the Yarn stable API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+ private var amClient: AMRMClient[ContainerRequest] = _
+ private var uiHistoryAddress: String = _
+
+ override def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String) = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(conf)
+ amClient.start()
+ this.uiHistoryAddress = uiHistoryAddress
+
+ logInfo("Registering the ApplicationMaster")
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
+ preferredNodeLocations)
+ }
+
+ override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =
+ amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+
+ override def getAttemptId() = {
+ val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ appAttemptId
+ }
+
+ override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
+
+ override def getMaxRegAttempts(conf: YarnConfiguration) =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+
+}