diff options
Diffstat (limited to 'yarn/common')
10 files changed, 644 insertions, 62 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala new file mode 100644 index 0000000000..f15c813b83 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.Socket +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.JavaConversions._ +import scala.util.Try + +import akka.actor._ +import akka.remote._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.util.ShutdownHookManager +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} + +/** + * Common application master functionality for Spark on Yarn. + */ +private[spark] class ApplicationMaster(args: ApplicationMasterArguments, + client: YarnRMClient) extends Logging { + // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be + // optimal as more containers are available. Might need to handle this better. + private val ALLOCATE_HEARTBEAT_INTERVAL = 100 + + private val sparkConf = new SparkConf() + private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration()) + private val isDriver = args.userClass != null + + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + + @volatile private var finished = false + @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED + + private var reporterThread: Thread = _ + private var allocator: YarnAllocator = _ + + // Fields used in client mode. + private var actorSystem: ActorSystem = null + private var actor: ActorRef = _ + + // Fields used in cluster mode. + private val sparkContextRef = new AtomicReference[SparkContext](null) + + final def run(): Int = { + if (isDriver) { + // Set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") + + // Set the master property to match the requested mode. + System.setProperty("spark.master", "yarn-cluster") + } + + logInfo("ApplicationAttemptId: " + client.getAttemptId()) + + val cleanupHook = new Runnable { + override def run() { + // If the SparkContext is still registered, shut it down as a best case effort in case + // users do not call sc.stop or do System.exit(). + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() + finish(FinalApplicationStatus.SUCCEEDED) + } + + // Cleanup the staging dir after the app is finished, or if it's the last attempt at + // running the AM. + val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + if (finished || isLastAttempt) { + cleanupStagingDir() + } + } + } + // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using. + ShutdownHookManager.get().addShutdownHook(cleanupHook, 30) + + // Call this to force generation of secret so it gets populated into the + // Hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the executor containers. + val securityMgr = new SecurityManager(sparkConf) + + if (isDriver) { + runDriver() + } else { + runExecutorLauncher(securityMgr) + } + + if (finalStatus != FinalApplicationStatus.UNDEFINED) { + finish(finalStatus) + 0 + } else { + 1 + } + } + + final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { + if (!finished) { + logInfo(s"Finishing ApplicationMaster with $status" + + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) + finished = true + finalStatus = status + try { + if (Thread.currentThread() != reporterThread) { + reporterThread.interrupt() + reporterThread.join() + } + } finally { + client.shutdown(status, Option(diagnostics).getOrElse("")) + } + } + } + + private def sparkContextInitialized(sc: SparkContext) = { + sparkContextRef.synchronized { + sparkContextRef.compareAndSet(null, sc) + sparkContextRef.notifyAll() + } + } + + private def sparkContextStopped(sc: SparkContext) = { + sparkContextRef.compareAndSet(sc, null) + } + + private def registerAM(uiAddress: String, uiHistoryAddress: String) = { + val sc = sparkContextRef.get() + allocator = client.register(yarnConf, + if (sc != null) sc.getConf else sparkConf, + if (sc != null) sc.preferredNodeLocationData else Map(), + uiAddress, + uiHistoryAddress) + + allocator.allocateResources() + reporterThread = launchReporterThread() + } + + private def runDriver(): Unit = { + addAmIpFilter() + val userThread = startUserClass() + + // This a bit hacky, but we need to wait until the spark.driver.port property has + // been set by the Thread executing the user class. + val sc = waitForSparkContextInitialized() + + // If there is no SparkContext at this point, just fail the app. + if (sc == null) { + finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") + } else { + registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf)) + try { + userThread.join() + } finally { + // In cluster mode, ask the reporter thread to stop since the user app is finished. + reporterThread.interrupt() + } + } + } + + private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { + actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf, securityManager = securityMgr)._1 + actor = waitForSparkDriver() + addAmIpFilter() + registerAM(sparkConf.get("spark.driver.appUIAddress", ""), + sparkConf.get("spark.driver.appUIHistoryAddress", "")) + + // In client mode the actor will stop the reporter thread. + reporterThread.join() + finalStatus = FinalApplicationStatus.SUCCEEDED + } + + private def launchReporterThread(): Thread = { + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. + val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + + // must be <= expiryInterval / 2. + val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) + + val t = new Thread { + override def run() { + while (!finished) { + checkNumExecutorsFailed() + if (!finished) { + logDebug("Sending progress") + allocator.allocateResources() + try { + Thread.sleep(interval) + } catch { + case e: InterruptedException => + } + } + } + } + } + // setting to daemon status, though this is usually not a good idea. + t.setDaemon(true) + t.setName("Reporter") + t.start() + logInfo("Started progress reporter thread - sleep time : " + interval) + t + } + + /** + * Clean up the staging directory. + */ + private def cleanupStagingDir() { + val fs = FileSystem.get(yarnConf) + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + private def waitForSparkContextInitialized(): SparkContext = { + logInfo("Waiting for spark context initialization") + try { + sparkContextRef.synchronized { + var count = 0 + val waitTime = 10000L + val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) + while (sparkContextRef.get() == null && count < numTries && !finished) { + logInfo("Waiting for spark context initialization ... " + count) + count = count + 1 + sparkContextRef.wait(waitTime) + } + + val sparkContext = sparkContextRef.get() + assert(sparkContext != null || count >= numTries) + if (sparkContext == null) { + logError( + "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format( + count * waitTime, numTries)) + } + sparkContext + } + } + } + + private def waitForSparkDriver(): ActorRef = { + logInfo("Waiting for Spark driver to be reachable.") + var driverUp = false + val hostport = args.userArgs(0) + val (driverHost, driverPort) = Utils.parseHostPort(hostport) + while (!driverUp) { + try { + val socket = new Socket(driverHost, driverPort) + socket.close() + logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) + driverUp = true + } catch { + case e: Exception => + logError("Failed to connect to driver at %s:%s, retrying ...". + format(driverHost, driverPort)) + Thread.sleep(100) + } + } + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) + + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + driverHost, + driverPort.toString, + CoarseGrainedSchedulerBackend.ACTOR_NAME) + actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") + } + + private def checkNumExecutorsFailed() = { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.") + + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from checkNumExecutorsFailed") + sc.stop() + } + } + } + + /** Add the Yarn IP filter that is required for properly securing the UI. */ + private def addAmIpFilter() = { + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + val proxy = client.getProxyHostAndPort(yarnConf) + val parts = proxy.split(":") + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val uriBase = "http://" + proxy + proxyBase + val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + + if (isDriver) { + System.setProperty("spark.ui.filters", amFilter) + System.setProperty(s"spark.$amFilter.params", params) + } else { + actor ! AddWebUIFilter(amFilter, params, proxyBase) + } + } + + private def startUserClass(): Thread = { + logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) + val mainMethod = Class.forName(args.userClass, false, + Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) + + val t = new Thread { + override def run() { + var status = FinalApplicationStatus.FAILED + try { + // Copy + val mainArgs = new Array[String](args.userArgs.size) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) + mainMethod.invoke(null, mainArgs) + // Some apps have "System.exit(0)" at the end. The user thread will stop here unless + // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. + status = FinalApplicationStatus.SUCCEEDED + } finally { + logDebug("Finishing main") + } + finalStatus = status + } + } + t.setName("Driver") + t.start() + t + } + + // Actor used to monitor the driver when running in client deploy mode. + private class MonitorActor(driverUrl: String) extends Actor { + + var driver: ActorSelection = _ + + override def preStart() = { + logInfo("Listen to driver: " + driverUrl) + driver = context.actorSelection(driverUrl) + // Send a hello message to establish the connection, after which + // we can monitor Lifecycle Events. + driver ! "Hello" + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + } + + override def receive = { + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") + finish(FinalApplicationStatus.SUCCEEDED) + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x + } + + } + +} + +object ApplicationMaster extends Logging { + + private var master: ApplicationMaster = _ + + def main(args: Array[String]) = { + SignalLogger.register(log) + val amArgs = new ApplicationMasterArguments(args) + SparkHadoopUtil.get.runAsSparkUser { () => + master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) + System.exit(master.run()) + } + } + + private[spark] def sparkContextInitialized(sc: SparkContext) = { + master.sparkContextInitialized(sc) + } + + private[spark] def sparkContextStopped(sc: SparkContext) = { + master.sparkContextStopped(sc) + } + +} + +/** + * This object does not provide any special functionality. It exists so that it's easy to tell + * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. + */ +object ExecutorLauncher { + + def main(args: Array[String]) = { + ApplicationMaster.main(args) + } + +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 424b0fb093..3e6b96fb63 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -63,11 +63,6 @@ class ApplicationMasterArguments(val args: Array[String]) { executorCores = value args = tail - case Nil => - if (userJar == null || userClass == null) { - printUsageAndExit(1) - } - case _ => printUsageAndExit(1, args) } @@ -80,16 +75,17 @@ class ApplicationMasterArguments(val args: Array[String]) { if (unknownParam != null) { System.err.println("Unknown/unsupported param " + unknownParam) } - System.err.println( - "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" + - "Options:\n" + - " --jar JAR_PATH Path to your application's JAR file (required)\n" + - " --class CLASS_NAME Name of your application's main class (required)\n" + - " --args ARGS Arguments to be passed to your application's main class.\n" + - " Mutliple invocations are possible, each will be passed in order.\n" + - " --num-executors NUM Number of executors to start (Default: 2)\n" + - " --executor-cores NUM Number of cores for the executors (Default: 1)\n" + - " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n") + System.err.println(""" + |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file + | --class CLASS_NAME Name of your application's main class + | --args ARGS Arguments to be passed to your application's main class. + | Mutliple invocations are possible, each will be passed in order. + | --num-executors NUM Number of executors to start (Default: 2) + | --executor-cores NUM Number of cores for the executors (Default: 1) + | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) + """.stripMargin) System.exit(exitCode) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index afa4fd4c69..40d8d6d6e6 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -37,7 +37,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var numExecutors = 2 var amQueue = sparkConf.get("QUEUE", "default") var amMemory: Int = 512 // MB - var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" var priority = 0 @@ -78,10 +77,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { args = tail case ("--master-class" | "--am-class") :: value :: tail => - if (args(0) == "--master-class") { - println("--master-class is deprecated. Use --am-class instead.") - } - amClass = value + println(s"${args(0)} is deprecated and is not used anymore.") args = tail case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => @@ -133,9 +129,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { args = tail case Nil => - if (userClass == null) { - throw new IllegalArgumentException(getUsageMessage()) - } case _ => throw new IllegalArgumentException(getUsageMessage(args)) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 3897b3a373..6cf300c398 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -42,12 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar /** * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The * Client submits an application to the YARN ResourceManager. - * - * Depending on the deployment mode this will launch one of two application master classes: - * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] - * which launches a driver program inside of the cluster. - * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to - * request executors on behalf of a driver running outside of the cluster. */ trait ClientBase extends Logging { val args: ClientArguments @@ -67,14 +61,11 @@ trait ClientBase extends Logging { // Additional memory overhead - in mb. protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { Map( - ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> - "Error: You must specify a user jar when running in standalone mode!"), - (args.userClass == null) -> "Error: You must specify a user class!", (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" + "greater than: " + memoryOverhead), @@ -321,6 +312,8 @@ trait ClientBase extends Logging { val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) + val isLaunchingDriver = args.userClass != null + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for @@ -329,7 +322,7 @@ trait ClientBase extends Logging { // Note that to warn the user about the deprecation in cluster mode, some code from // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition // described above). - if (args.amClass == classOf[ApplicationMaster].getName) { + if (isLaunchingDriver) { sys.env.get("SPARK_JAVA_OPTS").foreach { value => val warning = s""" @@ -389,7 +382,7 @@ trait ClientBase extends Logging { javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - if (args.amClass == classOf[ApplicationMaster].getName) { + if (isLaunchingDriver) { sparkConf.getOption("spark.driver.extraJavaOptions") .orElse(sys.env.get("SPARK_JAVA_OPTS")) .foreach(opts => javaOpts += opts) @@ -397,22 +390,37 @@ trait ClientBase extends Logging { .foreach(p => javaOpts += s"-Djava.library.path=$p") } - // Command for the ApplicationMaster - val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ - javaOpts ++ - Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass), - "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar), - userArgsToString(args), - "--executor-memory", args.executorMemory.toString, + val userClass = + if (args.userClass != null) { + Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) + } else { + Nil + } + val amClass = + if (isLaunchingDriver) { + classOf[ApplicationMaster].getName() + } else { + classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher") + } + val amArgs = + Seq(amClass) ++ userClass ++ + (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++ + Seq("--executor-memory", args.executorMemory.toString, "--executor-cores", args.executorCores.toString, "--num-executors ", args.numExecutors.toString, + userArgsToString(args)) + + // Command for the ApplicationMaster + val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ + javaOpts ++ amArgs ++ + Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") logInfo("Yarn AM launch context:") - logInfo(s" class: ${args.amClass}") - logInfo(s" env: $env") - logInfo(s" command: ${commands.mkString(" ")}") + logInfo(s" user class: ${args.userClass}") + logInfo(s" env: $env") + logInfo(s" command: ${commands.mkString(" ")}") // TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null" else s).toList @@ -623,7 +631,7 @@ object ClientBase extends Logging { YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path, File.pathSeparator) - /** + /** * Get the list of namenodes the user may access. */ private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala new file mode 100644 index 0000000000..cad94e5e19 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +object AllocationType extends Enumeration { + type AllocationType = Value + val HOST, RACK, ANY = Value +} + +/** + * Interface that defines a Yarn allocator. + */ +trait YarnAllocator { + + def allocateResources(): Unit + def getNumExecutorsFailed: Int + def getNumExecutorsRunning: Int + +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala new file mode 100644 index 0000000000..922d7d1a85 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.{Map, Set} + +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.api.records._ + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.scheduler.SplitInfo + +/** + * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that + * is used by Spark's AM. + */ +trait YarnRMClient { + + /** + * Registers the application master with the RM. + * + * @param conf The Yarn configuration. + * @param sparkConf The Spark configuration. + * @param preferredNodeLocations Map with hints about where to allocate containers. + * @param uiAddress Address of the SparkUI. + * @param uiHistoryAddress Address of the application on the History Server. + */ + def register( + conf: YarnConfiguration, + sparkConf: SparkConf, + preferredNodeLocations: Map[String, Set[SplitInfo]], + uiAddress: String, + uiHistoryAddress: String): YarnAllocator + + /** + * Shuts down the AM. Guaranteed to only be called once. + * + * @param status The final status of the AM. + * @param diagnostics Diagnostics message to include in the final status. + */ + def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit + + /** Returns the attempt ID. */ + def getAttemptId(): ApplicationAttemptId + + /** Returns the RM's proxy host and port. */ + def getProxyHostAndPort(conf: YarnConfiguration): String + + /** Returns the maximum number of attempts to register the AM. */ + def getMaxRegAttempts(conf: YarnConfiguration): Int + +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 10aef5eb24..2aa27a1908 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,8 +17,11 @@ package org.apache.spark.deploy.yarn +import java.lang.{Boolean => JBoolean} +import java.util.{Collections, Set => JSet} import java.util.regex.Matcher import java.util.regex.Pattern +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.HashMap @@ -29,11 +32,13 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.util.Utils /** * Contains util methods to interact with Hadoop from spark. @@ -79,6 +84,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } object YarnSparkHadoopUtil { + // Additional memory overhead - in mb. + val DEFAULT_MEMORY_OVERHEAD = 384 + + val ANY_HOST = "*" + + // All RM requests are issued with same priority : we do not (yet) have any distinction between + // request types (like map/reduce in hadoop for example) + val RM_REQUEST_PRIORITY = 1 + + // Host to rack map - saved from allocation requests. We are expecting this not to change. + // Note that it is possible for this to change : and ResourceManager will indicate that to us via + // update response to allocate. But we are punting on handling that for now. + private val hostToRack = new ConcurrentHashMap[String, String]() + private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() + def addToEnvironment( env: HashMap[String, String], variable: String, @@ -173,4 +193,35 @@ object YarnSparkHadoopUtil { } } + private[spark] def lookupRack(conf: Configuration, host: String): String = { + if (!hostToRack.contains(host)) { + populateRackInfo(conf, host) + } + hostToRack.get(host) + } + + private[spark] def populateRackInfo(conf: Configuration, hostname: String) { + Utils.checkHost(hostname) + + if (!hostToRack.containsKey(hostname)) { + // If there are repeated failures to resolve, all to an ignore list. + val rackInfo = RackResolver.resolve(conf, hostname) + if (rackInfo != null && rackInfo.getNetworkLocation != null) { + val rack = rackInfo.getNetworkLocation + hostToRack.put(hostname, rack) + if (! rackToHostSet.containsKey(rack)) { + rackToHostSet.putIfAbsent(rack, + Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) + } + rackToHostSet.get(rack).add(hostname) + + // TODO(harvey): Figure out what this comment means... + // Since RackResolver caches, we are disabling this for now ... + } /* else { + // right ? Else we will keep calling rack resolver in case we cant resolve rack info ... + hostToRack.put(hostname, null) + } */ + } + } + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 3474112ded..d162b4c433 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -19,22 +19,21 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ import org.apache.hadoop.conf.Configuration -import org.apache.spark.deploy.yarn.YarnAllocationHandler +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** - * - * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM. + * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) + extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - val retval = YarnAllocationHandler.lookupRack(conf, host) - if (retval != null) Some(retval) else None + Option(YarnSparkHadoopUtil.lookupRack(conf, host)) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 833e249f9f..a5f537dd9d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import scala.collection.mutable.ArrayBuffer @@ -60,10 +60,7 @@ private[spark] class YarnClientSchedulerBackend( val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( - "--class", "notused", - "--jar", null, // The primary jar will be added dynamically in SparkContext. - "--args", hostport, - "--am-class", classOf[ExecutorLauncher].getName + "--args", hostport ) // process any optional arguments, given either as environment variables diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 9aeca4a637..69f40225a2 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -18,16 +18,17 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ -import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} +import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration /** - * - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done + * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) + extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") @@ -42,7 +43,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - val retval = YarnAllocationHandler.lookupRack(conf, host) + val retval = YarnSparkHadoopUtil.lookupRack(conf, host) if (retval != null) Some(retval) else None } @@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } + + override def stop() { + super.stop() + ApplicationMaster.sparkContextStopped(sc) + } + } |