aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-08-27 11:02:04 -0500
committerThomas Graves <tgraves@apache.org>2014-08-27 11:02:04 -0500
commitb92d823ad13f6fcc325eeb99563bea543871c6aa (patch)
treee9d7314f985ac7dadadc66ce1e538c54ac7316cd /yarn
parent6f671d04fa98f97fd48c5e749b9f47dd4a8b4f44 (diff)
downloadspark-b92d823ad13f6fcc325eeb99563bea543871c6aa.tar.gz
spark-b92d823ad13f6fcc325eeb99563bea543871c6aa.tar.bz2
spark-b92d823ad13f6fcc325eeb99563bea543871c6aa.zip
[SPARK-2933] [yarn] Refactor and cleanup Yarn AM code.
This change modifies the Yarn module so that all the logic related to running the ApplicationMaster is localized. Instead of, previously, 4 different classes with mostly identical code, now we have: - A single, shared ApplicationMaster class, which can operate both in client and cluster mode, and substitutes the old ApplicationMaster (for cluster mode) and ExecutorLauncher (for client mode). The benefit here is that all different execution modes for all supported yarn versions use the same shared code for monitoring executor allocation, setting up configuration, and monitoring the process's lifecycle. - A new YarnRMClient interface, which defines basic RM functionality needed by the ApplicationMaster. This interface has concrete implementations for each supported Yarn version. - A new YarnAllocator interface, which just abstracts the existing interface of the YarnAllocationHandler class. This is to avoid having to touch the allocator code too much in this change, although it might benefit from a similar effort in the future. The end result is much easier to understand code, with much less duplication, making it much easier to fix bugs, add features, and test everything knowing that all supported versions will behave the same. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2020 from vanzin/SPARK-2933 and squashes the following commits: 3bbf3e7 [Marcelo Vanzin] Merge branch 'master' into SPARK-2933 ff389ed [Marcelo Vanzin] Do not interrupt reporter thread from within itself. 3a8ed37 [Marcelo Vanzin] Remote stale comment. 0f5142c [Marcelo Vanzin] Review feedback. 41f8c8a [Marcelo Vanzin] Fix app status reporting. c0794be [Marcelo Vanzin] Correctly clean up staging directory. 92770cc [Marcelo Vanzin] Merge branch 'master' into SPARK-2933 ecaf332 [Marcelo Vanzin] Small fix to shutdown code. f02d3f8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2933 f581122 [Marcelo Vanzin] Review feedback. 557fdeb [Marcelo Vanzin] Cleanup a couple more constants. be6068d [Marcelo Vanzin] Restore shutdown hook to clean up staging dir. 5150993 [Marcelo Vanzin] Some more cleanup. b6289ab [Marcelo Vanzin] Move cluster/client code to separate methods. ecb23cd [Marcelo Vanzin] More trivial cleanup. 34f1e63 [Marcelo Vanzin] Fix some questionable error handling. 5657c7d [Marcelo Vanzin] Finish app if SparkContext initialization times out. 0e4be3d [Marcelo Vanzin] Keep "ExecutorLauncher" as the main class for client-mode AM. 91beabb [Marcelo Vanzin] Fix UI filter registration. 8c72239 [Marcelo Vanzin] Trivial cleanups. 99a52d5 [Marcelo Vanzin] Changes to the yarn-alpha project to use common AM code. 848ca6d [Marcelo Vanzin] [SPARK-2933] [yarn] Refactor and cleanup Yarn AM code.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala453
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala315
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala192
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala103
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala430
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala26
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala9
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala54
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala34
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala67
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala51
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala11
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala7
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala17
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala413
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala276
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala196
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala76
18 files changed, 892 insertions, 1838 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 4d4848b1bd..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.net.Socket
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.{SignalLogger, Utils}
-
-/**
- * An application master that runs the users driver program and allocates executors.
- */
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
- sparkConf: SparkConf) extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private val rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = _
- private var appAttemptId: ApplicationAttemptId = _
- private var userThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- private val fs = FileSystem.get(yarnConf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var isFinished: Boolean = false
- private var uiAddress: String = _
- private var uiHistoryAddress: String = _
- private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
- YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
- private var isLastAMRetry: Boolean = true
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- private var registered = false
-
- def run() {
- // set the web ui port to be ephemeral for yarn so we don't conflict with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
-
- // when running the AM, the Spark master is always "yarn-cluster"
- System.setProperty("spark.master", "yarn-cluster")
-
- // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
- ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
- appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
- resourceManager = registerWithResourceManager()
-
- // setup AmIpFilter for the SparkUI - do this before we start the UI
- addAmIpFilter()
-
- ApplicationMaster.register(this)
-
- // Call this to force generation of secret so it gets populated into the
- // hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the executor containers.
- val securityMgr = new SecurityManager(sparkConf)
-
- // Start the user's JAR
- userThread = startUserClass()
-
- // This a bit hacky, but we need to wait until the spark.driver.port property has
- // been set by the Thread executing the user class.
- waitForSparkContextInitialized()
-
- // Do this after spark master is up and SparkContext is created so that we can register UI Url
- synchronized {
- if (!isFinished) {
- registerApplicationMaster()
- registered = true
- }
- }
-
- // Allocate all containers
- allocateExecutors()
-
- // Wait for the user class to Finish
- userThread.join()
-
- System.exit(0)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- System.setProperty("spark.ui.filters", amFilter)
- val proxy = YarnConfiguration.getProxyHostAndPort(conf)
- val parts : Array[String] = proxy.split(":")
- val uriBase = "http://" + proxy +
- System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
- params)
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some
- // sensible info.
- // Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- appMasterRequest.setTrackingUrl(uiAddress)
- resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
- System.setProperty("spark.executor.instances", args.numExecutors.toString)
- val mainMethod = Class.forName(
- args.userClass,
- false /* initialize */ ,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
- override def run() {
-
- var successed = false
- try {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
- // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
- // userThread will stop here unless it has uncaught exception thrown out
- // It need shutdown hook to set SUCCEEDED
- successed = true
- } finally {
- logDebug("finishing main")
- isLastAMRetry = true
- if (successed) {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } else {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- }
- }
- }
- }
- t.start()
- t
- }
-
- // this need to happen before allocateExecutors
- private def waitForSparkContextInitialized() {
- logInfo("Waiting for spark context initialization")
- try {
- var sparkContext: SparkContext = null
- ApplicationMaster.sparkContextRef.synchronized {
- var count = 0
- val waitTime = 10000L
- val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
- && !isFinished) {
- logInfo("Waiting for spark context initialization ... " + count)
- count = count + 1
- ApplicationMaster.sparkContextRef.wait(waitTime)
- }
- sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null || count >= numTries)
-
- if (null != sparkContext) {
- uiAddress = sparkContext.ui.appUIHostPort
- uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- resourceManager,
- appAttemptId,
- args,
- sparkContext.preferredNodeLocationData,
- sparkContext.getConf)
- } else {
- logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
- format(count * waitTime, numTries))
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- resourceManager,
- appAttemptId,
- args,
- sparkContext.getConf)
- }
- }
- }
- }
-
- private def allocateExecutors() {
- try {
- logInfo("Allocating " + args.numExecutors + " executors.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
-
- // Exits the loop if the user thread exits.
- while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
- && !isFinished) {
- checkNumExecutorsFailed()
- yarnAllocator.allocateContainers(
- math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
- Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
- }
- }
- logInfo("All executors have launched.")
-
- // Launch a progress reporter thread, else the app will get killed after expiration
- // (def: 10mins) timeout.
- // TODO(harvey): Verify the timeout
- if (userThread.isAlive) {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- launchReporterThread(interval)
- }
- }
-
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (userThread.isAlive && !isFinished) {
- checkNumExecutorsFailed()
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.allocateContainers(missingExecutorCount)
- } else {
- sendProgress()
- }
- Thread.sleep(sleepTime)
- }
- }
- }
- // Setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- logInfo("max number of executor failures reached")
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- // make sure to stop the user thread
- val sparkContext = ApplicationMaster.sparkContextRef.get()
- if (sparkContext != null) {
- logInfo("Invoking sc stop from checkNumExecutorsFailed")
- sparkContext.stop()
- } else {
- logError("sparkContext is null when should shutdown")
- }
- }
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // Simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- /*
- def printContainers(containers: List[Container]) = {
- for (container <- containers) {
- logInfo("Launching shell command on a new container."
- + ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
- + ":" + container.getNodeId().getPort()
- + ", containerNodeURI=" + container.getNodeHttpAddress()
- + ", containerState" + container.getState()
- + ", containerResourceMemory"
- + container.getResource().getMemory())
- }
- }
- */
-
- def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- isFinished = true
-
- logInfo("finishApplicationMaster with " + status)
- if (registered) {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- finishReq.setTrackingUrl(uiHistoryAddress)
- resourceManager.finishApplicationMaster(finishReq)
- }
- }
- }
-
- /**
- * Clean up the staging directory.
- */
- private def cleanupStagingDir() {
- var stagingDirPath: Path = null
- try {
- val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
- if (!preserveFiles) {
- stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
- if (stagingDirPath == null) {
- logError("Staging directory is null")
- return
- }
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
- }
- } catch {
- case ioe: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
- }
- }
-
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
- class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
- def run() {
- logInfo("AppMaster received a signal.")
- // we need to clean up staging dir before HDFS is shut down
- // make sure we don't delete it until this is the last AM
- if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
- }
- }
-
-}
-
-object ApplicationMaster extends Logging {
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
- // optimal as more containers are available. Might need to handle this better.
- private val ALLOCATE_HEARTBEAT_INTERVAL = 100
-
- private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
- def register(master: ApplicationMaster) {
- applicationMasters.add(master)
- }
-
- val sparkContextRef: AtomicReference[SparkContext] =
- new AtomicReference[SparkContext](null /* initialValue */)
-
- def sparkContextInitialized(sc: SparkContext): Boolean = {
- var modified = false
- sparkContextRef.synchronized {
- modified = sparkContextRef.compareAndSet(null, sc)
- sparkContextRef.notifyAll()
- }
-
- // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
- // System.exit.
- // Should not really have to do this, but it helps YARN to evict resources earlier.
- // Not to mention, prevent the Client from declaring failure even though we exited properly.
- // Note that this will unfortunately not properly clean up the staging files because it gets
- // called too late, after the filesystem is already shutdown.
- if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not only logs, but also ensures that log system is initialized for this instance
- // when we are actually 'run'-ing.
- logInfo("Adding shutdown hook for context " + sc)
-
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- // Best case ...
- for (master <- applicationMasters) {
- master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- }
- }
- })
- }
-
- modified
- }
-
- def main(argStrings: Array[String]) {
- SignalLogger.register(log)
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ApplicationMaster(args).run()
- }
- }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
deleted file mode 100644
index 155dd88aa2..0000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * An application master that allocates executors on behalf of a driver that is running outside
- * the cluster.
- *
- * This is used only in yarn-client mode.
- */
-class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
- extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private val rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = _
- private var appAttemptId: ApplicationAttemptId = _
- private var reporterThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- private var yarnAllocator: YarnAllocationHandler = _
-
- private var driverClosed: Boolean = false
- private var isFinished: Boolean = false
- private var registered: Boolean = false
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- val securityManager = new SecurityManager(sparkConf)
- val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf, securityManager = securityManager)._1
- var actor: ActorRef = _
-
- // This actor just working as a monitor to watch on Driver Actor.
- class MonitorActor(driverUrl: String) extends Actor {
-
- var driver: ActorSelection = _
-
- override def preStart() {
- logInfo("Listen to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- // Send a hello message thus the connection is actually established, thus we can
- // monitor Lifecycle Events.
- driver ! "Hello"
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- }
-
- override def receive = {
- case x: DisassociatedEvent =>
- logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- driverClosed = true
- case x: AddWebUIFilter =>
- logInfo(s"Add WebUI Filter. $x")
- driver ! x
- }
- }
-
- def run() {
- appAttemptId = getApplicationAttemptId()
- resourceManager = registerWithResourceManager()
-
- synchronized {
- if (!isFinished) {
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
- if (minimumMemory > 0) {
- val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- }
- }
- registered = true
- }
- }
- waitForSparkMaster()
- addAmIpFilter()
- // Allocate all containers
- allocateExecutors()
-
- // Launch a progress reporter thread, else app will get killed after expiration
- // (def: 10mins) timeout ensure that progress is sent before
- // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
-
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- reporterThread = launchReporterThread(interval)
-
- // Wait for the reporter thread to Finish.
- reporterThread.join()
-
- finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- actorSystem.shutdown()
-
- logInfo("Exited")
- System.exit(0)
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
- logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has
- // some sensible info. Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl(appUIAddress)
- resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val proxy = YarnConfiguration.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
- }
-
- private def waitForSparkMaster() {
- logInfo("Waiting for spark driver to be reachable.")
- var driverUp = false
- val hostport = args.userArgs(0)
- val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- while(!driverUp) {
- try {
- val socket = new Socket(driverHost, driverPort)
- socket.close()
- logInfo("Master now available: " + driverHost + ":" + driverPort)
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
- Thread.sleep(100)
- }
- }
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
-
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- driverHost,
- driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
- }
-
-
- private def allocateExecutors() {
-
- // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map()
-
- yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
- args, preferredNodeLocationData, sparkConf)
-
- logInfo("Allocating " + args.numExecutors + " executors.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
- while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
- !isFinished) {
- yarnAllocator.allocateContainers(
- math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
- checkNumExecutorsFailed()
- Thread.sleep(100)
- }
-
- logInfo("All executors have launched.")
- }
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
- }
-
- // TODO: We might want to extend this to allocate more containers in case they die !
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (!driverClosed && !isFinished) {
- checkNumExecutorsFailed()
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
- if (missingExecutorCount > 0) {
- logInfo("Allocating " + missingExecutorCount +
- " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingExecutorCount)
- } else {
- sendProgress()
- }
- Thread.sleep(sleepTime)
- }
- }
- }
- // setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- logInfo("Unregistering ApplicationMaster with " + status)
- if (registered) {
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
- finishReq.setDiagnostics(appMessage)
- resourceManager.finishApplicationMaster(finishReq)
- }
- isFinished = true
- }
- }
-
-}
-
-
-object ExecutorLauncher {
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ExecutorLauncher(args).run()
- }
- }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 568a6ef932..629cd13f67 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,33 +17,24 @@
package org.apache.spark.deploy.yarn
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.AMRMProtocol
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
+import org.apache.hadoop.yarn.util.Records
// TODO:
// Too many params.
@@ -59,16 +50,14 @@ object AllocationType extends Enumeration {
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
private[yarn] class YarnAllocationHandler(
- val conf: Configuration,
- val resourceManager: AMRMProtocol,
- val appAttemptId: ApplicationAttemptId,
- val maxExecutors: Int,
- val executorMemory: Int,
- val executorCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int],
- val sparkConf: SparkConf)
- extends Logging {
+ conf: Configuration,
+ sparkConf: SparkConf,
+ resourceManager: AMRMProtocol,
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments,
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+ extends YarnAllocator with Logging {
+
// These three are locked on allocatedHostToContainersMap. Complementary data structures
// allocatedHostToContainersMap : containers which are running : host, Set<containerid>
// allocatedContainerToHostMap: container to host mapping.
@@ -90,7 +79,7 @@ private[yarn] class YarnAllocationHandler(
// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
@@ -98,6 +87,12 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
+ private val maxExecutors = args.numExecutors
+ private val executorMemory = args.executorMemory
+ private val executorCores = args.executorCores
+ private val (preferredHostToCount, preferredRackToCount) =
+ generateNodeToWeight(conf, preferredNodes)
+
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -106,9 +101,10 @@ private[yarn] class YarnAllocationHandler(
container.getResource.getMemory >= (executorMemory + memoryOverhead)
}
- def allocateContainers(executorsToRequest: Int) {
+ override def allocateResources() = {
// We need to send the request only once from what I understand ... but for now, not modifying
// this much.
+ val executorsToRequest = Math.max(maxExecutors - numExecutorsRunning.get(), 0)
// Keep polling the Resource Manager for containers
val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
@@ -182,7 +178,7 @@ private[yarn] class YarnAllocationHandler(
// Now rack local
if (remainingContainers != null){
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null){
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
@@ -256,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
// Should not be there, but ..
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
@@ -331,7 +327,7 @@ private[yarn] class YarnAllocationHandler(
allocatedContainerToHostMap -= containerId
// Doing this within locked context, sigh ... move to outside ?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
if (rackCount > 0) {
@@ -364,9 +360,9 @@ private[yarn] class YarnAllocationHandler(
for (container <- hostContainers) {
val candidateHost = container.getHostName
val candidateNumContainers = container.getNumContainers
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+ assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null) {
var count = rackToCounts.getOrElse(rack, 0)
count += candidateNumContainers
@@ -378,7 +374,8 @@ private[yarn] class YarnAllocationHandler(
new ArrayBuffer[ResourceRequest](rackToCounts.size)
for ((rack, count) <- rackToCounts){
requestedContainers +=
- createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
+ createResourceRequest(AllocationType.RACK, rack, count,
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
requestedContainers.toList
@@ -409,7 +406,7 @@ private[yarn] class YarnAllocationHandler(
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
- AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
+ AllocationType.ANY, null, numExecutors, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
@@ -423,7 +420,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.HOST,
candidateHost,
requiredCount,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
}
val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
@@ -433,7 +430,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.ANY,
resource = null,
numExecutors,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
hostContainerRequests.size + rackContainerRequests.size + 1)
@@ -483,12 +480,12 @@ private[yarn] class YarnAllocationHandler(
// There must be a third request - which is ANY : that will be specially handled.
requestType match {
case AllocationType.HOST => {
- assert(YarnAllocationHandler.ANY_HOST != resource)
+ assert(YarnSparkHadoopUtil.ANY_HOST != resource)
val hostname = resource
val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
// Add to host->rack mapping
- YarnAllocationHandler.populateRackInfo(conf, hostname)
+ YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
nodeLocal
}
@@ -497,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
createResourceRequestImpl(rack, numExecutors, priority)
}
case AllocationType.ANY => createResourceRequestImpl(
- YarnAllocationHandler.ANY_HOST, numExecutors, priority)
+ YarnSparkHadoopUtil.ANY_HOST, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@@ -541,90 +538,6 @@ private[yarn] class YarnAllocationHandler(
retval
}
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb
- val MEMORY_OVERHEAD = 384
-
- // Host to rack map - saved from allocation requests
- // We are expecting this not to change.
- // Note that it is possible for this to change : and RM will indicate that to us via update
- // response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- Map[String, Int](),
- Map[String, Int](),
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String,
- collection.Set[SplitInfo]],
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- resourceManager: AMRMProtocol,
- appAttemptId: ApplicationAttemptId,
- maxExecutors: Int,
- executorMemory: Int,
- executorCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]],
- sparkConf: SparkConf): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
- new YarnAllocationHandler(
- conf,
- resourceManager,
- appAttemptId,
- maxExecutors,
- executorMemory,
- executorCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
// A simple method to copy the split info map.
private def generateNodeToWeight(
@@ -642,7 +555,7 @@ object YarnAllocationHandler {
val hostCount = hostToCount.getOrElse(host, 0)
hostToCount.put(host, hostCount + splits.size)
- val rack = lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null){
val rackCount = rackToCount.getOrElse(host, 0)
rackToCount.put(host, rackCount + splits.size)
@@ -652,41 +565,4 @@ object YarnAllocationHandler {
(hostToCount.toMap, rackToCount.toMap)
}
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) populateRackInfo(conf, host)
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- val set = rackToHostSet.get(rack)
- if (set == null) return None
-
- // No better way to get a Set[String] from JSet ?
- val convertedSet: collection.mutable.Set[String] = set
- Some(convertedSet.toSet)
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list ?
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out this comment...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000000..cc5392192e
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+/**
+ * YarnRMClient implementation for the Yarn alpha API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+ private var rpc: YarnRPC = null
+ private var resourceManager: AMRMProtocol = _
+ private var uiHistoryAddress: String = _
+
+ override def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String) = {
+ this.rpc = YarnRPC.create(conf)
+ this.uiHistoryAddress = uiHistoryAddress
+
+ resourceManager = registerWithResourceManager(conf)
+ registerApplicationMaster(uiAddress)
+
+ new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
+ preferredNodeLocations)
+ }
+
+ override def getAttemptId() = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ appAttemptId
+ }
+
+ override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(getAttemptId())
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ finishReq.setTrackingUrl(uiHistoryAddress)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+ override def getProxyHostAndPort(conf: YarnConfiguration) =
+ YarnConfiguration.getProxyHostAndPort(conf)
+
+ override def getMaxRegAttempts(conf: YarnConfiguration) =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+
+ private def registerWithResourceManager(conf: YarnConfiguration): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(uiAddress: String): RegisterApplicationMasterResponse = {
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(getAttemptId())
+ // Setting this to master host,port - so that the ApplicationReport at client has some
+ // sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ appMasterRequest.setTrackingUrl(uiAddress)
+ resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000000..f15c813b83
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import akka.actor._
+import akka.remote._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+
+/**
+ * Common application master functionality for Spark on Yarn.
+ */
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
+ client: YarnRMClient) extends Logging {
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+ // optimal as more containers are available. Might need to handle this better.
+ private val ALLOCATE_HEARTBEAT_INTERVAL = 100
+
+ private val sparkConf = new SparkConf()
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
+ private val isDriver = args.userClass != null
+
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+
+ @volatile private var finished = false
+ @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+
+ private var reporterThread: Thread = _
+ private var allocator: YarnAllocator = _
+
+ // Fields used in client mode.
+ private var actorSystem: ActorSystem = null
+ private var actor: ActorRef = _
+
+ // Fields used in cluster mode.
+ private val sparkContextRef = new AtomicReference[SparkContext](null)
+
+ final def run(): Int = {
+ if (isDriver) {
+ // Set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
+
+ // Set the master property to match the requested mode.
+ System.setProperty("spark.master", "yarn-cluster")
+ }
+
+ logInfo("ApplicationAttemptId: " + client.getAttemptId())
+
+ val cleanupHook = new Runnable {
+ override def run() {
+ // If the SparkContext is still registered, shut it down as a best case effort in case
+ // users do not call sc.stop or do System.exit().
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ finish(FinalApplicationStatus.SUCCEEDED)
+ }
+
+ // Cleanup the staging dir after the app is finished, or if it's the last attempt at
+ // running the AM.
+ val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+ if (finished || isLastAttempt) {
+ cleanupStagingDir()
+ }
+ }
+ }
+ // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
+ ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
+
+ // Call this to force generation of secret so it gets populated into the
+ // Hadoop UGI. This has to happen before the startUserClass which does a
+ // doAs in order for the credentials to be passed on to the executor containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
+ if (isDriver) {
+ runDriver()
+ } else {
+ runExecutorLauncher(securityMgr)
+ }
+
+ if (finalStatus != FinalApplicationStatus.UNDEFINED) {
+ finish(finalStatus)
+ 0
+ } else {
+ 1
+ }
+ }
+
+ final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+ if (!finished) {
+ logInfo(s"Finishing ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+ finished = true
+ finalStatus = status
+ try {
+ if (Thread.currentThread() != reporterThread) {
+ reporterThread.interrupt()
+ reporterThread.join()
+ }
+ } finally {
+ client.shutdown(status, Option(diagnostics).getOrElse(""))
+ }
+ }
+ }
+
+ private def sparkContextInitialized(sc: SparkContext) = {
+ sparkContextRef.synchronized {
+ sparkContextRef.compareAndSet(null, sc)
+ sparkContextRef.notifyAll()
+ }
+ }
+
+ private def sparkContextStopped(sc: SparkContext) = {
+ sparkContextRef.compareAndSet(sc, null)
+ }
+
+ private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+ val sc = sparkContextRef.get()
+ allocator = client.register(yarnConf,
+ if (sc != null) sc.getConf else sparkConf,
+ if (sc != null) sc.preferredNodeLocationData else Map(),
+ uiAddress,
+ uiHistoryAddress)
+
+ allocator.allocateResources()
+ reporterThread = launchReporterThread()
+ }
+
+ private def runDriver(): Unit = {
+ addAmIpFilter()
+ val userThread = startUserClass()
+
+ // This a bit hacky, but we need to wait until the spark.driver.port property has
+ // been set by the Thread executing the user class.
+ val sc = waitForSparkContextInitialized()
+
+ // If there is no SparkContext at this point, just fail the app.
+ if (sc == null) {
+ finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
+ } else {
+ registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+ try {
+ userThread.join()
+ } finally {
+ // In cluster mode, ask the reporter thread to stop since the user app is finished.
+ reporterThread.interrupt()
+ }
+ }
+ }
+
+ private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+ actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ conf = sparkConf, securityManager = securityMgr)._1
+ actor = waitForSparkDriver()
+ addAmIpFilter()
+ registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
+ sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+
+ // In client mode the actor will stop the reporter thread.
+ reporterThread.join()
+ finalStatus = FinalApplicationStatus.SUCCEEDED
+ }
+
+ private def launchReporterThread(): Thread = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+ // must be <= expiryInterval / 2.
+ val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+
+ val t = new Thread {
+ override def run() {
+ while (!finished) {
+ checkNumExecutorsFailed()
+ if (!finished) {
+ logDebug("Sending progress")
+ allocator.allocateResources()
+ try {
+ Thread.sleep(interval)
+ } catch {
+ case e: InterruptedException =>
+ }
+ }
+ }
+ }
+ }
+ // setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.setName("Reporter")
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + interval)
+ t
+ }
+
+ /**
+ * Clean up the staging directory.
+ */
+ private def cleanupStagingDir() {
+ val fs = FileSystem.get(yarnConf)
+ var stagingDirPath: Path = null
+ try {
+ val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+ if (!preserveFiles) {
+ stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+ if (stagingDirPath == null) {
+ logError("Staging directory is null")
+ return
+ }
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case ioe: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ private def waitForSparkContextInitialized(): SparkContext = {
+ logInfo("Waiting for spark context initialization")
+ try {
+ sparkContextRef.synchronized {
+ var count = 0
+ val waitTime = 10000L
+ val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+ while (sparkContextRef.get() == null && count < numTries && !finished) {
+ logInfo("Waiting for spark context initialization ... " + count)
+ count = count + 1
+ sparkContextRef.wait(waitTime)
+ }
+
+ val sparkContext = sparkContextRef.get()
+ assert(sparkContext != null || count >= numTries)
+ if (sparkContext == null) {
+ logError(
+ "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format(
+ count * waitTime, numTries))
+ }
+ sparkContext
+ }
+ }
+ }
+
+ private def waitForSparkDriver(): ActorRef = {
+ logInfo("Waiting for Spark driver to be reachable.")
+ var driverUp = false
+ val hostport = args.userArgs(0)
+ val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+ while (!driverUp) {
+ try {
+ val socket = new Socket(driverHost, driverPort)
+ socket.close()
+ logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+ driverUp = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to driver at %s:%s, retrying ...".
+ format(driverHost, driverPort))
+ Thread.sleep(100)
+ }
+ }
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
+
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ driverHost,
+ driverPort.toString,
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ }
+
+ private def checkNumExecutorsFailed() = {
+ if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
+
+ val sc = sparkContextRef.get()
+ if (sc != null) {
+ logInfo("Invoking sc stop from checkNumExecutorsFailed")
+ sc.stop()
+ }
+ }
+ }
+
+ /** Add the Yarn IP filter that is required for properly securing the UI. */
+ private def addAmIpFilter() = {
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ val proxy = client.getProxyHostAndPort(yarnConf)
+ val parts = proxy.split(":")
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val uriBase = "http://" + proxy + proxyBase
+ val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+
+ if (isDriver) {
+ System.setProperty("spark.ui.filters", amFilter)
+ System.setProperty(s"spark.$amFilter.params", params)
+ } else {
+ actor ! AddWebUIFilter(amFilter, params, proxyBase)
+ }
+ }
+
+ private def startUserClass(): Thread = {
+ logInfo("Starting the user JAR in a separate Thread")
+ System.setProperty("spark.executor.instances", args.numExecutors.toString)
+ val mainMethod = Class.forName(args.userClass, false,
+ Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+
+ val t = new Thread {
+ override def run() {
+ var status = FinalApplicationStatus.FAILED
+ try {
+ // Copy
+ val mainArgs = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+ mainMethod.invoke(null, mainArgs)
+ // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
+ // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
+ status = FinalApplicationStatus.SUCCEEDED
+ } finally {
+ logDebug("Finishing main")
+ }
+ finalStatus = status
+ }
+ }
+ t.setName("Driver")
+ t.start()
+ t
+ }
+
+ // Actor used to monitor the driver when running in client deploy mode.
+ private class MonitorActor(driverUrl: String) extends Actor {
+
+ var driver: ActorSelection = _
+
+ override def preStart() = {
+ logInfo("Listen to driver: " + driverUrl)
+ driver = context.actorSelection(driverUrl)
+ // Send a hello message to establish the connection, after which
+ // we can monitor Lifecycle Events.
+ driver ! "Hello"
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ }
+
+ override def receive = {
+ case x: DisassociatedEvent =>
+ logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+ finish(FinalApplicationStatus.SUCCEEDED)
+ case x: AddWebUIFilter =>
+ logInfo(s"Add WebUI Filter. $x")
+ driver ! x
+ }
+
+ }
+
+}
+
+object ApplicationMaster extends Logging {
+
+ private var master: ApplicationMaster = _
+
+ def main(args: Array[String]) = {
+ SignalLogger.register(log)
+ val amArgs = new ApplicationMasterArguments(args)
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
+ System.exit(master.run())
+ }
+ }
+
+ private[spark] def sparkContextInitialized(sc: SparkContext) = {
+ master.sparkContextInitialized(sc)
+ }
+
+ private[spark] def sparkContextStopped(sc: SparkContext) = {
+ master.sparkContextStopped(sc)
+ }
+
+}
+
+/**
+ * This object does not provide any special functionality. It exists so that it's easy to tell
+ * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
+ */
+object ExecutorLauncher {
+
+ def main(args: Array[String]) = {
+ ApplicationMaster.main(args)
+ }
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 424b0fb093..3e6b96fb63 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -63,11 +63,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
executorCores = value
args = tail
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
case _ =>
printUsageAndExit(1, args)
}
@@ -80,16 +75,17 @@ class ApplicationMasterArguments(val args: Array[String]) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
- System.err.println(
- "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-executors NUM Number of executors to start (Default: 2)\n" +
- " --executor-cores NUM Number of cores for the executors (Default: 1)\n" +
- " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
+ System.err.println("""
+ |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
+ |Options:
+ | --jar JAR_PATH Path to your application's JAR file
+ | --class CLASS_NAME Name of your application's main class
+ | --args ARGS Arguments to be passed to your application's main class.
+ | Mutliple invocations are possible, each will be passed in order.
+ | --num-executors NUM Number of executors to start (Default: 2)
+ | --executor-cores NUM Number of cores for the executors (Default: 1)
+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+ """.stripMargin)
System.exit(exitCode)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index afa4fd4c69..40d8d6d6e6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -37,7 +37,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
- var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
var priority = 0
@@ -78,10 +77,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
args = tail
case ("--master-class" | "--am-class") :: value :: tail =>
- if (args(0) == "--master-class") {
- println("--master-class is deprecated. Use --am-class instead.")
- }
- amClass = value
+ println(s"${args(0)} is deprecated and is not used anymore.")
args = tail
case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
@@ -133,9 +129,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
args = tail
case Nil =>
- if (userClass == null) {
- throw new IllegalArgumentException(getUsageMessage())
- }
case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 3897b3a373..6cf300c398 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -42,12 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
* Client submits an application to the YARN ResourceManager.
- *
- * Depending on the deployment mode this will launch one of two application master classes:
- * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
- * which launches a driver program inside of the cluster.
- * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
- * request executors on behalf of a driver running outside of the cluster.
*/
trait ClientBase extends Logging {
val args: ClientArguments
@@ -67,14 +61,11 @@ trait ClientBase extends Logging {
// Additional memory overhead - in mb.
protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
// TODO(harvey): This could just go in ClientArguments.
def validateArgs() = {
Map(
- ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
- "Error: You must specify a user jar when running in standalone mode!"),
- (args.userClass == null) -> "Error: You must specify a user class!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + memoryOverhead),
@@ -321,6 +312,8 @@ trait ClientBase extends Logging {
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
+ val isLaunchingDriver = args.userClass != null
+
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
@@ -329,7 +322,7 @@ trait ClientBase extends Logging {
// Note that to warn the user about the deprecation in cluster mode, some code from
// SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
// described above).
- if (args.amClass == classOf[ApplicationMaster].getName) {
+ if (isLaunchingDriver) {
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
s"""
@@ -389,7 +382,7 @@ trait ClientBase extends Logging {
javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
}
- if (args.amClass == classOf[ApplicationMaster].getName) {
+ if (isLaunchingDriver) {
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
.foreach(opts => javaOpts += opts)
@@ -397,22 +390,37 @@ trait ClientBase extends Logging {
.foreach(p => javaOpts += s"-Djava.library.path=$p")
}
- // Command for the ApplicationMaster
- val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
- javaOpts ++
- Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass),
- "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
- userArgsToString(args),
- "--executor-memory", args.executorMemory.toString,
+ val userClass =
+ if (args.userClass != null) {
+ Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
+ } else {
+ Nil
+ }
+ val amClass =
+ if (isLaunchingDriver) {
+ classOf[ApplicationMaster].getName()
+ } else {
+ classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
+ }
+ val amArgs =
+ Seq(amClass) ++ userClass ++
+ (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
+ Seq("--executor-memory", args.executorMemory.toString,
"--executor-cores", args.executorCores.toString,
"--num-executors ", args.numExecutors.toString,
+ userArgsToString(args))
+
+ // Command for the ApplicationMaster
+ val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+ javaOpts ++ amArgs ++
+ Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
logInfo("Yarn AM launch context:")
- logInfo(s" class: ${args.amClass}")
- logInfo(s" env: $env")
- logInfo(s" command: ${commands.mkString(" ")}")
+ logInfo(s" user class: ${args.userClass}")
+ logInfo(s" env: $env")
+ logInfo(s" command: ${commands.mkString(" ")}")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
@@ -623,7 +631,7 @@ object ClientBase extends Logging {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
File.pathSeparator)
- /**
+ /**
* Get the list of namenodes the user may access.
*/
private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
new file mode 100644
index 0000000000..cad94e5e19
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+object AllocationType extends Enumeration {
+ type AllocationType = Value
+ val HOST, RACK, ANY = Value
+}
+
+/**
+ * Interface that defines a Yarn allocator.
+ */
+trait YarnAllocator {
+
+ def allocateResources(): Unit
+ def getNumExecutorsFailed: Int
+ def getNumExecutorsRunning: Int
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
new file mode 100644
index 0000000000..922d7d1a85
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.SplitInfo
+
+/**
+ * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
+ * is used by Spark's AM.
+ */
+trait YarnRMClient {
+
+ /**
+ * Registers the application master with the RM.
+ *
+ * @param conf The Yarn configuration.
+ * @param sparkConf The Spark configuration.
+ * @param preferredNodeLocations Map with hints about where to allocate containers.
+ * @param uiAddress Address of the SparkUI.
+ * @param uiHistoryAddress Address of the application on the History Server.
+ */
+ def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String): YarnAllocator
+
+ /**
+ * Shuts down the AM. Guaranteed to only be called once.
+ *
+ * @param status The final status of the AM.
+ * @param diagnostics Diagnostics message to include in the final status.
+ */
+ def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+
+ /** Returns the attempt ID. */
+ def getAttemptId(): ApplicationAttemptId
+
+ /** Returns the RM's proxy host and port. */
+ def getProxyHostAndPort(conf: YarnConfiguration): String
+
+ /** Returns the maximum number of attempts to register the AM. */
+ def getMaxRegAttempts(conf: YarnConfiguration): Int
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 10aef5eb24..2aa27a1908 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,8 +17,11 @@
package org.apache.spark.deploy.yarn
+import java.lang.{Boolean => JBoolean}
+import java.util.{Collections, Set => JSet}
import java.util.regex.Matcher
import java.util.regex.Pattern
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.HashMap
@@ -29,11 +32,13 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
/**
* Contains util methods to interact with Hadoop from spark.
@@ -79,6 +84,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
+ // Additional memory overhead - in mb.
+ val DEFAULT_MEMORY_OVERHEAD = 384
+
+ val ANY_HOST = "*"
+
+ // All RM requests are issued with same priority : we do not (yet) have any distinction between
+ // request types (like map/reduce in hadoop for example)
+ val RM_REQUEST_PRIORITY = 1
+
+ // Host to rack map - saved from allocation requests. We are expecting this not to change.
+ // Note that it is possible for this to change : and ResourceManager will indicate that to us via
+ // update response to allocate. But we are punting on handling that for now.
+ private val hostToRack = new ConcurrentHashMap[String, String]()
+ private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
def addToEnvironment(
env: HashMap[String, String],
variable: String,
@@ -173,4 +193,35 @@ object YarnSparkHadoopUtil {
}
}
+ private[spark] def lookupRack(conf: Configuration, host: String): String = {
+ if (!hostToRack.contains(host)) {
+ populateRackInfo(conf, host)
+ }
+ hostToRack.get(host)
+ }
+
+ private[spark] def populateRackInfo(conf: Configuration, hostname: String) {
+ Utils.checkHost(hostname)
+
+ if (!hostToRack.containsKey(hostname)) {
+ // If there are repeated failures to resolve, all to an ignore list.
+ val rackInfo = RackResolver.resolve(conf, hostname)
+ if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+ val rack = rackInfo.getNetworkLocation
+ hostToRack.put(hostname, rack)
+ if (! rackToHostSet.containsKey(rack)) {
+ rackToHostSet.putIfAbsent(rack,
+ Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+ }
+ rackToHostSet.get(rack).add(hostname)
+
+ // TODO(harvey): Figure out what this comment means...
+ // Since RackResolver caches, we are disabling this for now ...
+ } /* else {
+ // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+ hostToRack.put(hostname, null)
+ } */
+ }
+ }
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 3474112ded..d162b4c433 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -19,22 +19,21 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
/**
- *
- * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration)
+ extends TaskSchedulerImpl(sc) {
def this(sc: SparkContext) = this(sc, new Configuration())
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
+ Option(YarnSparkHadoopUtil.lookupRack(conf, host))
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 833e249f9f..a5f537dd9d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import scala.collection.mutable.ArrayBuffer
@@ -60,10 +60,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
- "--class", "notused",
- "--jar", null, // The primary jar will be added dynamically in SparkContext.
- "--args", hostport,
- "--am-class", classOf[ExecutorLauncher].getName
+ "--args", hostport
)
// process any optional arguments, given either as environment variables
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 9aeca4a637..69f40225a2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -18,16 +18,17 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
+ * ApplicationMaster, etc is done
*/
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
+ extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")
@@ -42,7 +43,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
+ val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}
@@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
+
+ override def stop() {
+ super.stop()
+ ApplicationMaster.sparkContextStopped(sc)
+ }
+
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 1c4005fd8e..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.{SignalLogger, Utils}
-
-
-/**
- * An application master that runs the user's driver program and allocates executors.
- */
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
- sparkConf: SparkConf) extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- private var appAttemptId: ApplicationAttemptId = _
- private var userThread: Thread = _
- private val fs = FileSystem.get(yarnConf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var isFinished: Boolean = false
- private var uiAddress: String = _
- private var uiHistoryAddress: String = _
- private val maxAppAttempts: Int = conf.getInt(
- YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
- private var isLastAMRetry: Boolean = true
- private var amClient: AMRMClient[ContainerRequest] = _
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- private var registered = false
-
- def run() {
- // Set the web ui port to be ephemeral for yarn so we don't conflict with
- // other spark processes running on the same box
- System.setProperty("spark.ui.port", "0")
-
- // When running the AM, the Spark master is always "yarn-cluster"
- System.setProperty("spark.master", "yarn-cluster")
-
- // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
- ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
- appAttemptId = ApplicationMaster.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
- amClient = AMRMClient.createAMRMClient()
- amClient.init(yarnConf)
- amClient.start()
-
- // setup AmIpFilter for the SparkUI - do this before we start the UI
- addAmIpFilter()
-
- ApplicationMaster.register(this)
-
- // Call this to force generation of secret so it gets populated into the
- // Hadoop UGI. This has to happen before the startUserClass which does a
- // doAs in order for the credentials to be passed on to the executor containers.
- val securityMgr = new SecurityManager(sparkConf)
-
- // Start the user's JAR
- userThread = startUserClass()
-
- // This a bit hacky, but we need to wait until the spark.driver.port property has
- // been set by the Thread executing the user class.
- waitForSparkContextInitialized()
-
- // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
- synchronized {
- if (!isFinished) {
- registerApplicationMaster()
- registered = true
- }
- }
-
- // Allocate all containers
- allocateExecutors()
-
- // Launch thread that will heartbeat to the RM so it won't think the app has died.
- launchReporterThread()
-
- // Wait for the user class to finish
- userThread.join()
-
- System.exit(0)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- System.setProperty("spark.ui.filters", amFilter)
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts : Array[String] = proxy.split(":")
- val uriBase = "http://" + proxy +
- System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
- val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty(
- "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
- }
-
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
- System.setProperty("spark.executor.instances", args.numExecutors.toString)
- val mainMethod = Class.forName(
- args.userClass,
- false,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
- override def run() {
- var succeeded = false
- try {
- // Copy
- val mainArgs = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
- // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
- // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
- succeeded = true
- } finally {
- logDebug("Finishing main")
- isLastAMRetry = true
- if (succeeded) {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } else {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- }
- }
- }
- }
- t.setName("Driver")
- t.start()
- t
- }
-
- // This needs to happen before allocateExecutors()
- private def waitForSparkContextInitialized() {
- logInfo("Waiting for Spark context initialization")
- try {
- var sparkContext: SparkContext = null
- ApplicationMaster.sparkContextRef.synchronized {
- var numTries = 0
- val waitTime = 10000L
- val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
- && !isFinished) {
- logInfo("Waiting for Spark context initialization ... " + numTries)
- numTries = numTries + 1
- ApplicationMaster.sparkContextRef.wait(waitTime)
- }
- sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null || numTries >= maxNumTries)
-
- if (sparkContext != null) {
- uiAddress = sparkContext.ui.appUIHostPort
- uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- amClient,
- appAttemptId,
- args,
- sparkContext.preferredNodeLocationData,
- sparkContext.getConf)
- } else {
- logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d".
- format(numTries * waitTime, maxNumTries))
- this.yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- amClient,
- appAttemptId,
- args,
- sparkContext.getConf)
- }
- }
- }
- }
-
- private def allocateExecutors() {
- try {
- logInfo("Requesting" + args.numExecutors + " executors.")
- // Wait until all containers have launched
- yarnAllocator.addResourceRequests(args.numExecutors)
- yarnAllocator.allocateResources()
- // Exits the loop if the user thread exits.
-
- while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
- && !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- yarnAllocator.allocateResources()
- Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
- }
- }
- logInfo("All executors have launched.")
- }
-
- private def allocateMissingExecutor() {
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
- }
-
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- logInfo("max number of executor failures reached")
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- // make sure to stop the user thread
- val sparkContext = ApplicationMaster.sparkContextRef.get()
- if (sparkContext != null) {
- logInfo("Invoking sc stop from checkNumExecutorsFailed")
- sparkContext.stop()
- } else {
- logError("sparkContext is null when should shutdown")
- }
- }
- }
-
- private def launchReporterThread(): Thread = {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
- // must be <= timeoutInterval / 2.
- val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
-
- val t = new Thread {
- override def run() {
- while (userThread.isAlive && !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- logDebug("Sending progress")
- yarnAllocator.allocateResources()
- Thread.sleep(interval)
- }
- }
- }
- // Setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - heartbeat interval : " + interval)
- t
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- isFinished = true
-
- logInfo("Unregistering ApplicationMaster with " + status)
- if (registered) {
- amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
- }
- }
- }
-
- /**
- * Clean up the staging directory.
- */
- private def cleanupStagingDir() {
- var stagingDirPath: Path = null
- try {
- val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
- if (!preserveFiles) {
- stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
- if (stagingDirPath == null) {
- logError("Staging directory is null")
- return
- }
- logInfo("Deleting staging directory " + stagingDirPath)
- fs.delete(stagingDirPath, true)
- }
- } catch {
- case ioe: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
- }
- }
-
- // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
- class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
- def run() {
- logInfo("AppMaster received a signal.")
- // We need to clean up staging dir before HDFS is shut down
- // make sure we don't delete it until this is the last AM
- if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
- }
- }
-
-}
-
-object ApplicationMaster extends Logging {
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
- // optimal as more containers are available. Might need to handle this better.
- private val ALLOCATE_HEARTBEAT_INTERVAL = 100
-
- private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
- val sparkContextRef: AtomicReference[SparkContext] =
- new AtomicReference[SparkContext](null)
-
- def register(master: ApplicationMaster) {
- applicationMasters.add(master)
- }
-
- /**
- * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been
- * initialized in the user code.
- */
- def sparkContextInitialized(sc: SparkContext): Boolean = {
- var modified = false
- sparkContextRef.synchronized {
- modified = sparkContextRef.compareAndSet(null, sc)
- sparkContextRef.notifyAll()
- }
-
- // Add a shutdown hook - as a best effort in case users do not call sc.stop or do
- // System.exit.
- // Should not really have to do this, but it helps YARN to evict resources earlier.
- // Not to mention, prevent the Client from declaring failure even though we exited properly.
- // Note that this will unfortunately not properly clean up the staging files because it gets
- // called too late, after the filesystem is already shutdown.
- if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not only logs, but also ensures that log system is initialized for this instance
- // when we are actually 'run'-ing.
- logInfo("Adding shutdown hook for context " + sc)
-
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- // Best case ...
- for (master <- applicationMasters) {
- master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- }
- }
- })
- }
-
- // Wait for initialization to complete and at least 'some' nodes to get allocated.
- modified
- }
-
- def getApplicationAttemptId(): ApplicationAttemptId = {
- val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- appAttemptId
- }
-
- def main(argStrings: Array[String]) {
- SignalLogger.register(log)
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ApplicationMaster(args).run()
- }
- }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
deleted file mode 100644
index e093fe4ae6..0000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import akka.actor._
-import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-/**
- * An application master that allocates executors on behalf of a driver that is running outside
- * the cluster.
- *
- * This is used only in yarn-client mode.
- */
-class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
- extends Logging {
-
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
- this(args, new Configuration(), sparkConf)
-
- def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
- private var appAttemptId: ApplicationAttemptId = _
- private var reporterThread: Thread = _
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- private var yarnAllocator: YarnAllocationHandler = _
- private var driverClosed: Boolean = false
- private var isFinished: Boolean = false
- private var registered: Boolean = false
-
- private var amClient: AMRMClient[ContainerRequest] = _
-
- // Default to numExecutors * 2, with minimum of 3
- private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
- sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
- val securityManager = new SecurityManager(sparkConf)
- val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
- conf = sparkConf, securityManager = securityManager)._1
- var actor: ActorRef = _
-
- // This actor just working as a monitor to watch on Driver Actor.
- class MonitorActor(driverUrl: String) extends Actor {
-
- var driver: ActorSelection = _
-
- override def preStart() {
- logInfo("Listen to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- // Send a hello message to establish the connection, after which
- // we can monitor Lifecycle Events.
- driver ! "Hello"
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- }
-
- override def receive = {
- case x: DisassociatedEvent =>
- logInfo(s"Driver terminated or disconnected! Shutting down. $x")
- driverClosed = true
- case x: AddWebUIFilter =>
- logInfo(s"Add WebUI Filter. $x")
- driver ! x
- }
- }
-
- def run() {
- amClient = AMRMClient.createAMRMClient()
- amClient.init(yarnConf)
- amClient.start()
-
- appAttemptId = ApplicationMaster.getApplicationAttemptId()
- synchronized {
- if (!isFinished) {
- registerApplicationMaster()
- registered = true
- }
- }
-
- waitForSparkMaster()
- addAmIpFilter()
-
- // Allocate all containers
- allocateExecutors()
-
- // Launch a progress reporter thread, else app will get killed after expiration
- // (def: 10mins) timeout ensure that progress is sent before
- // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- reporterThread = launchReporterThread(interval)
-
-
- // Wait for the reporter thread to Finish.
- reporterThread.join()
-
- finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- actorSystem.shutdown()
-
- logInfo("Exited")
- System.exit(0)
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
- logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
- amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
- }
-
- // add the yarn amIpFilter that Yarn requires for properly securing the UI
- private def addAmIpFilter() {
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
- val uriBase = "http://" + proxy + proxyBase
- val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
- actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
- }
-
- private def waitForSparkMaster() {
- logInfo("Waiting for Spark driver to be reachable.")
- var driverUp = false
- val hostport = args.userArgs(0)
- val (driverHost, driverPort) = Utils.parseHostPort(hostport)
- while(!driverUp) {
- try {
- val socket = new Socket(driverHost, driverPort)
- socket.close()
- logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at %s:%s, retrying ...".
- format(driverHost, driverPort))
- Thread.sleep(100)
- }
- }
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
-
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- driverHost,
- driverPort.toString,
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
- }
-
-
- private def allocateExecutors() {
- // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map()
-
- yarnAllocator = YarnAllocationHandler.newAllocator(
- yarnConf,
- amClient,
- appAttemptId,
- args,
- preferredNodeLocationData,
- sparkConf)
-
- logInfo("Requesting " + args.numExecutors + " executors.")
- // Wait until all containers have launched
- yarnAllocator.addResourceRequests(args.numExecutors)
- yarnAllocator.allocateResources()
- while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
- !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- yarnAllocator.allocateResources()
- Thread.sleep(100)
- }
-
- logInfo("All executors have launched.")
- }
-
- private def allocateMissingExecutor() {
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
- }
-
- private def checkNumExecutorsFailed() {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
- }
-
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (!driverClosed && !isFinished) {
- checkNumExecutorsFailed()
- allocateMissingExecutor()
- logDebug("Sending progress")
- yarnAllocator.allocateResources()
- Thread.sleep(sleepTime)
- }
- }
- }
- // setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- t
- }
-
- def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
- synchronized {
- if (isFinished) {
- return
- }
- logInfo("Unregistering ApplicationMaster with " + status)
- if (registered) {
- val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
- amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
- }
- isFinished = true
- }
- }
-
-}
-
-object ExecutorLauncher {
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- SparkHadoopUtil.get.runAsSparkUser { () =>
- new ExecutorLauncher(args).run()
- }
- }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 0a461749c8..4d51449899 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,12 +17,9 @@
package org.apache.spark.deploy.yarn
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -32,20 +29,13 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
+import org.apache.hadoop.yarn.util.Records
// TODO:
// Too many params.
@@ -61,16 +51,14 @@ object AllocationType extends Enumeration {
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
private[yarn] class YarnAllocationHandler(
- val conf: Configuration,
- val amClient: AMRMClient[ContainerRequest],
- val appAttemptId: ApplicationAttemptId,
- val maxExecutors: Int,
- val executorMemory: Int,
- val executorCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int],
- val sparkConf: SparkConf)
- extends Logging {
+ conf: Configuration,
+ sparkConf: SparkConf,
+ amClient: AMRMClient[ContainerRequest],
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments,
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+ extends YarnAllocator with Logging {
+
// These three are locked on allocatedHostToContainersMap. Complementary data structures
// allocatedHostToContainersMap : containers which are running : host, Set<containerid>
// allocatedContainerToHostMap: container to host mapping.
@@ -92,7 +80,7 @@ private[yarn] class YarnAllocationHandler(
// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
@@ -103,11 +91,15 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
- def getNumPendingAllocate: Int = numPendingAllocate.intValue
+ private val maxExecutors = args.numExecutors
+ private val executorMemory = args.executorMemory
+ private val executorCores = args.executorCores
+ private val (preferredHostToCount, preferredRackToCount) =
+ generateNodeToWeight(conf, preferredNodes)
- def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+ override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
- def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+ override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (executorMemory + memoryOverhead)
@@ -119,7 +111,9 @@ private[yarn] class YarnAllocationHandler(
amClient.releaseAssignedContainer(containerId)
}
- def allocateResources() {
+ override def allocateResources() = {
+ addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
+
// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
val progressIndicator = 0.1f
@@ -204,7 +198,7 @@ private[yarn] class YarnAllocationHandler(
// For rack local containers
if (remainingContainers != null) {
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null) {
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
@@ -273,7 +267,7 @@ private[yarn] class YarnAllocationHandler(
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
- val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
@@ -360,7 +354,7 @@ private[yarn] class YarnAllocationHandler(
allocatedContainerToHostMap.remove(containerId)
// TODO: Move this part outside the synchronized block?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
if (rackCount > 0) {
@@ -393,9 +387,9 @@ private[yarn] class YarnAllocationHandler(
for (container <- hostContainers) {
val candidateHost = container.getNodes.last
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+ assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
if (rack != null) {
var count = rackToCounts.getOrElse(rack, 0)
count += 1
@@ -409,7 +403,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.RACK,
rack,
count,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
requestedContainers
@@ -431,7 +425,7 @@ private[yarn] class YarnAllocationHandler(
retval
}
- def addResourceRequests(numExecutors: Int) {
+ private def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
@@ -440,9 +434,9 @@ private[yarn] class YarnAllocationHandler(
AllocationType.ANY,
resource = null,
numExecutors,
- YarnAllocationHandler.PRIORITY).toList
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
} else {
- // Request for all hosts in preferred nodes and for numExecutors -
+ // Request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -453,7 +447,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.HOST,
candidateHost,
requiredCount,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}
}
val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
@@ -463,7 +457,7 @@ private[yarn] class YarnAllocationHandler(
AllocationType.ANY,
resource = null,
numExecutors,
- YarnAllocationHandler.PRIORITY)
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
@@ -512,7 +506,7 @@ private[yarn] class YarnAllocationHandler(
// There must be a third request, which is ANY. That will be specially handled.
requestType match {
case AllocationType.HOST => {
- assert(YarnAllocationHandler.ANY_HOST != resource)
+ assert(YarnSparkHadoopUtil.ANY_HOST != resource)
val hostname = resource
val nodeLocal = constructContainerRequests(
Array(hostname),
@@ -521,7 +515,7 @@ private[yarn] class YarnAllocationHandler(
priority)
// Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnAllocationHandler.populateRackInfo(conf, hostname)
+ YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
nodeLocal
}
case AllocationType.RACK => {
@@ -554,88 +548,6 @@ private[yarn] class YarnAllocationHandler(
}
requests
}
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb.
- val MEMORY_OVERHEAD = 384
-
- // Host to rack map - saved from allocation requests. We are expecting this not to change.
- // Note that it is possible for this to change : and ResurceManager will indicate that to us via
- // update response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- Map[String, Int](),
- Map[String, Int](),
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String,
- collection.Set[SplitInfo]],
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- args.numExecutors,
- args.executorMemory,
- args.executorCores,
- hostToSplitCount,
- rackToSplitCount,
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- maxExecutors: Int,
- executorMemory: Int,
- executorCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]],
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- maxExecutors,
- executorMemory,
- executorCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
// A simple method to copy the split info map.
private def generateNodeToWeight(
@@ -654,7 +566,7 @@ object YarnAllocationHandler {
val hostCount = hostToCount.getOrElse(host, 0)
hostToCount.put(host, hostCount + splits.size)
- val rack = lookupRack(conf, host)
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
if (rack != null){
val rackCount = rackToCount.getOrElse(host, 0)
rackToCount.put(host, rackCount + splits.size)
@@ -664,42 +576,4 @@ object YarnAllocationHandler {
(hostToCount.toMap, rackToCount.toMap)
}
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) {
- populateRackInfo(conf, host)
- }
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- Option(rackToHostSet.get(rack)).map { set =>
- val convertedSet: collection.mutable.Set[String] = set
- // TODO: Better way to get a Set[String] from JSet.
- convertedSet.toSet
- }
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list.
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out what this comment means...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000000..e8b8d9bc72
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+
+/**
+ * YarnRMClient implementation for the Yarn stable API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+ private var amClient: AMRMClient[ContainerRequest] = _
+ private var uiHistoryAddress: String = _
+
+ override def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String) = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(conf)
+ amClient.start()
+ this.uiHistoryAddress = uiHistoryAddress
+
+ logInfo("Registering the ApplicationMaster")
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
+ preferredNodeLocations)
+ }
+
+ override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =
+ amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+
+ override def getAttemptId() = {
+ val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ appAttemptId
+ }
+
+ override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
+
+ override def getMaxRegAttempts(conf: YarnConfiguration) =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+
+}