From 912563aa3553afc0871d5b5858f533aa39cb99e5 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 9 Dec 2014 11:02:43 -0800 Subject: SPARK-4338. [YARN] Ditch yarn-alpha. Sorry if this is a little premature with 1.2 still not out the door, but it will make other work like SPARK-4136 and SPARK-2089 a lot easier. Author: Sandy Ryza Closes #3215 from sryza/sandy-spark-4338 and squashes the following commits: 1c5ac08 [Sandy Ryza] Update building Spark docs and remove unnecessary newline 9c1421c [Sandy Ryza] SPARK-4338. Ditch yarn-alpha. --- .../spark/deploy/yarn/ApplicationMaster.scala | 539 -------------- .../deploy/yarn/ApplicationMasterArguments.scala | 96 --- .../apache/spark/deploy/yarn/ClientArguments.scala | 198 ----- .../org/apache/spark/deploy/yarn/ClientBase.scala | 823 --------------------- .../yarn/ClientDistributedCacheManager.scala | 207 ------ .../spark/deploy/yarn/ExecutorRunnableUtil.scala | 202 ----- .../apache/spark/deploy/yarn/YarnAllocator.scala | 538 -------------- .../apache/spark/deploy/yarn/YarnRMClient.scala | 68 -- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 224 ------ .../cluster/YarnClientClusterScheduler.scala | 35 - .../cluster/YarnClientSchedulerBackend.scala | 157 ---- .../scheduler/cluster/YarnClusterScheduler.scala | 56 -- .../cluster/YarnClusterSchedulerBackend.scala | 50 -- .../apache/spark/deploy/yarn/ClientBaseSuite.scala | 256 ------- .../yarn/ClientDistributedCacheManagerSuite.scala | 220 ------ .../spark/deploy/yarn/YarnAllocatorSuite.scala | 34 - .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 151 ---- 17 files changed, 3854 deletions(-) delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala delete mode 100644 yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala delete mode 100644 yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala delete mode 100644 yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala delete mode 100644 yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala delete mode 100644 yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala (limited to 'yarn/common/src') diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala deleted file mode 100644 index 987b3373fb..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.util.control.NonFatal - -import java.io.IOException -import java.lang.reflect.InvocationTargetException -import java.net.Socket -import java.util.concurrent.atomic.AtomicReference - -import akka.actor._ -import akka.remote._ -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.util.ShutdownHookManager -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} -import org.apache.spark.SparkException -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.history.HistoryServer -import org.apache.spark.scheduler.cluster.YarnSchedulerBackend -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} - -/** - * Common application master functionality for Spark on Yarn. - */ -private[spark] class ApplicationMaster(args: ApplicationMasterArguments, - client: YarnRMClient) extends Logging { - // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be - // optimal as more containers are available. Might need to handle this better. - - private val sparkConf = new SparkConf() - private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) - .asInstanceOf[YarnConfiguration] - private val isDriver = args.userClass != null - - // Default to numExecutors * 2, with minimum of 3 - private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", - sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) - - @volatile private var exitCode = 0 - @volatile private var unregistered = false - @volatile private var finished = false - @volatile private var finalStatus = FinalApplicationStatus.SUCCEEDED - @volatile private var finalMsg: String = "" - @volatile private var userClassThread: Thread = _ - - private var reporterThread: Thread = _ - private var allocator: YarnAllocator = _ - - // Fields used in client mode. - private var actorSystem: ActorSystem = null - private var actor: ActorRef = _ - - // Fields used in cluster mode. - private val sparkContextRef = new AtomicReference[SparkContext](null) - - final def run(): Int = { - try { - val appAttemptId = client.getAttemptId() - - if (isDriver) { - // Set the web ui port to be ephemeral for yarn so we don't conflict with - // other spark processes running on the same box - System.setProperty("spark.ui.port", "0") - - // Set the master property to match the requested mode. - System.setProperty("spark.master", "yarn-cluster") - - // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. - System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - } - - logInfo("ApplicationAttemptId: " + appAttemptId) - - val fs = FileSystem.get(yarnConf) - val cleanupHook = new Runnable { - override def run() { - // If the SparkContext is still registered, shut it down as a best case effort in case - // users do not call sc.stop or do System.exit(). - val sc = sparkContextRef.get() - if (sc != null) { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() - } - val maxAppAttempts = client.getMaxRegAttempts(yarnConf) - val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts - - if (!finished) { - // This happens when the user application calls System.exit(). We have the choice - // of either failing or succeeding at this point. We report success to avoid - // retrying applications that have succeeded (System.exit(0)), which means that - // applications that explicitly exit with a non-zero status will also show up as - // succeeded in the RM UI. - finish(finalStatus, - ApplicationMaster.EXIT_SUCCESS, - "Shutdown hook called before final status was reported.") - } - - if (!unregistered) { - // we only want to unregister if we don't want the RM to retry - if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { - unregister(finalStatus, finalMsg) - cleanupStagingDir(fs) - } - } - } - } - - // Use higher priority than FileSystem. - assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) - ShutdownHookManager - .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) - - // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the executor containers. - val securityMgr = new SecurityManager(sparkConf) - - if (isDriver) { - runDriver(securityMgr) - } else { - runExecutorLauncher(securityMgr) - } - } catch { - case e: Exception => - // catch everything else if not specifically handled - logError("Uncaught exception: ", e) - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, - "Uncaught exception: " + e.getMessage()) - } - exitCode - } - - /** - * unregister is used to completely unregister the application from the ResourceManager. - * This means the ResourceManager will not retry the application attempt on your behalf if - * a failure occurred. - */ - final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { - if (!unregistered) { - logInfo(s"Unregistering ApplicationMaster with $status" + - Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) - unregistered = true - client.unregister(status, Option(diagnostics).getOrElse("")) - } - } - - final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized { - if (!finished) { - val inShutdown = Utils.inShutdown() - logInfo(s"Final app status: ${status}, exitCode: ${code}" + - Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - exitCode = code - finalStatus = status - finalMsg = msg - finished = true - if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { - logDebug("shutting down reporter thread") - reporterThread.interrupt() - } - if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) { - logDebug("shutting down user thread") - userClassThread.interrupt() - } - } - } - - private def sparkContextInitialized(sc: SparkContext) = { - sparkContextRef.synchronized { - sparkContextRef.compareAndSet(null, sc) - sparkContextRef.notifyAll() - } - } - - private def sparkContextStopped(sc: SparkContext) = { - sparkContextRef.compareAndSet(sc, null) - } - - private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { - val sc = sparkContextRef.get() - - val appId = client.getAttemptId().getApplicationId().toString() - val historyAddress = - sparkConf.getOption("spark.yarn.historyServer.address") - .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" } - .getOrElse("") - - allocator = client.register(yarnConf, - if (sc != null) sc.getConf else sparkConf, - if (sc != null) sc.preferredNodeLocationData else Map(), - uiAddress, - historyAddress, - securityMgr) - - allocator.allocateResources() - reporterThread = launchReporterThread() - } - - private def runDriver(securityMgr: SecurityManager): Unit = { - addAmIpFilter() - userClassThread = startUserClass() - - // This a bit hacky, but we need to wait until the spark.driver.port property has - // been set by the Thread executing the user class. - val sc = waitForSparkContextInitialized() - - // If there is no SparkContext at this point, just fail the app. - if (sc == null) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_SC_NOT_INITED, - "Timed out waiting for SparkContext.") - } else { - registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) - userClassThread.join() - } - } - - private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { - actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf, securityManager = securityMgr)._1 - actor = waitForSparkDriver() - addAmIpFilter() - registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) - - // In client mode the actor will stop the reporter thread. - reporterThread.join() - } - - private def launchReporterThread(): Thread = { - // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. - val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - - // we want to be reasonably responsive without causing too many requests to RM. - val schedulerInterval = - sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) - - // must be <= expiryInterval / 2. - val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) - - // The number of failures in a row until Reporter thread give up - val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) - - val t = new Thread { - override def run() { - var failureCount = 0 - while (!finished) { - try { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, - "Max number of executor failures reached") - } else { - logDebug("Sending progress") - allocator.allocateResources() - } - failureCount = 0 - } catch { - case i: InterruptedException => - case e: Throwable => { - failureCount += 1 - if (!NonFatal(e) || failureCount >= reporterMaxFailures) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + - s"${failureCount} time(s) from Reporter thread.") - - } else { - logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) - } - } - } - try { - Thread.sleep(interval) - } catch { - case e: InterruptedException => - } - } - } - } - // setting to daemon status, though this is usually not a good idea. - t.setDaemon(true) - t.setName("Reporter") - t.start() - logInfo("Started progress reporter thread - sleep time : " + interval) - t - } - - /** - * Clean up the staging directory. - */ - private def cleanupStagingDir(fs: FileSystem) { - var stagingDirPath: Path = null - try { - val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean - if (!preserveFiles) { - stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) - if (stagingDirPath == null) { - logError("Staging directory is null") - return - } - logInfo("Deleting staging directory " + stagingDirPath) - fs.delete(stagingDirPath, true) - } - } catch { - case ioe: IOException => - logError("Failed to cleanup staging dir " + stagingDirPath, ioe) - } - } - - private def waitForSparkContextInitialized(): SparkContext = { - logInfo("Waiting for spark context initialization") - try { - sparkContextRef.synchronized { - var count = 0 - val waitTime = 10000L - val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) - while (sparkContextRef.get() == null && count < numTries && !finished) { - logInfo("Waiting for spark context initialization ... " + count) - count = count + 1 - sparkContextRef.wait(waitTime) - } - - val sparkContext = sparkContextRef.get() - if (sparkContext == null) { - logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" - + " log output for errors. Failing the application.").format(numTries * waitTime)) - } - sparkContext - } - } - } - - private def waitForSparkDriver(): ActorRef = { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - var count = 0 - val hostport = args.userArgs(0) - val (driverHost, driverPort) = Utils.parseHostPort(hostport) - - // spark driver should already be up since it launched us, but we don't want to - // wait forever, so wait 100 seconds max to match the cluster mode setting. - // Leave this config unpublished for now. SPARK-3779 to investigating changing - // this config to be time based. - val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000) - - while (!driverUp && !finished && count < numTries) { - try { - count = count + 1 - val socket = new Socket(driverHost, driverPort) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => - logError("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - } - } - - if (!driverUp) { - throw new SparkException("Failed to connect to driver!") - } - - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) - - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( - SparkEnv.driverActorSystemName, - driverHost, - driverPort.toString, - YarnSchedulerBackend.ACTOR_NAME) - actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM") - } - - /** Add the Yarn IP filter that is required for properly securing the UI. */ - private def addAmIpFilter() = { - val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" - val params = client.getAmIpFilterParams(yarnConf, proxyBase) - if (isDriver) { - System.setProperty("spark.ui.filters", amFilter) - params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } - } else { - actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase) - } - } - - /** - * Start the user class, which contains the spark driver, in a separate Thread. - * If the main routine exits cleanly or exits with System.exit(N) for any N - * we assume it was successful, for all other cases we assume failure. - * - * Returns the user thread that was started. - */ - private def startUserClass(): Thread = { - logInfo("Starting the user JAR in a separate Thread") - System.setProperty("spark.executor.instances", args.numExecutors.toString) - val mainMethod = Class.forName(args.userClass, false, - Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) - - val userThread = new Thread { - override def run() { - try { - val mainArgs = new Array[String](args.userArgs.size) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) - mainMethod.invoke(null, mainArgs) - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - logDebug("Done running users class") - } catch { - case e: InvocationTargetException => - e.getCause match { - case _: InterruptedException => - // Reporter thread can interrupt to stop user class - case e: Exception => - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, - "User class threw exception: " + e.getMessage) - // re-throw to get it logged - throw e - } - } - } - } - userThread.setName("Driver") - userThread.start() - userThread - } - - /** - * Actor that communicates with the driver in client deploy mode. - */ - private class AMActor(driverUrl: String) extends Actor { - var driver: ActorSelection = _ - - override def preStart() = { - logInfo("Listen to driver: " + driverUrl) - driver = context.actorSelection(driverUrl) - // Send a hello message to establish the connection, after which - // we can monitor Lifecycle Events. - driver ! "Hello" - driver ! RegisterClusterManager - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - } - - override def receive = { - case x: DisassociatedEvent => - logInfo(s"Driver terminated or disconnected! Shutting down. $x") - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - - case x: AddWebUIFilter => - logInfo(s"Add WebUI Filter. $x") - driver ! x - - case RequestExecutors(requestedTotal) => - logInfo(s"Driver requested a total number of $requestedTotal executor(s).") - Option(allocator) match { - case Some(a) => a.requestTotalExecutors(requestedTotal) - case None => logWarning("Container allocator is not ready to request executors yet.") - } - sender ! true - - case KillExecutors(executorIds) => - logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") - Option(allocator) match { - case Some(a) => executorIds.foreach(a.killExecutor) - case None => logWarning("Container allocator is not ready to kill executors yet.") - } - sender ! true - } - } - -} - -object ApplicationMaster extends Logging { - - val SHUTDOWN_HOOK_PRIORITY: Int = 30 - - // exit codes for different causes, no reason behind the values - private val EXIT_SUCCESS = 0 - private val EXIT_UNCAUGHT_EXCEPTION = 10 - private val EXIT_MAX_EXECUTOR_FAILURES = 11 - private val EXIT_REPORTER_FAILURE = 12 - private val EXIT_SC_NOT_INITED = 13 - private val EXIT_SECURITY = 14 - private val EXIT_EXCEPTION_USER_CLASS = 15 - - private var master: ApplicationMaster = _ - - def main(args: Array[String]) = { - SignalLogger.register(log) - val amArgs = new ApplicationMasterArguments(args) - SparkHadoopUtil.get.runAsSparkUser { () => - master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) - System.exit(master.run()) - } - } - - private[spark] def sparkContextInitialized(sc: SparkContext) = { - master.sparkContextInitialized(sc) - } - - private[spark] def sparkContextStopped(sc: SparkContext) = { - master.sparkContextStopped(sc) - } - -} - -/** - * This object does not provide any special functionality. It exists so that it's easy to tell - * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. - */ -object ExecutorLauncher { - - def main(args: Array[String]) = { - ApplicationMaster.main(args) - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala deleted file mode 100644 index d76a63276d..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import org.apache.spark.util.{MemoryParam, IntParam} -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import collection.mutable.ArrayBuffer - -class ApplicationMasterArguments(val args: Array[String]) { - var userJar: String = null - var userClass: String = null - var userArgs: Seq[String] = Seq[String]() - var executorMemory = 1024 - var executorCores = 1 - var numExecutors = DEFAULT_NUMBER_EXECUTORS - - parseArgs(args.toList) - - private def parseArgs(inputArgs: List[String]): Unit = { - val userArgsBuffer = new ArrayBuffer[String]() - - var args = inputArgs - - while (!args.isEmpty) { - // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, - // the properties with executor in their names are preferred. - args match { - case ("--jar") :: value :: tail => - userJar = value - args = tail - - case ("--class") :: value :: tail => - userClass = value - args = tail - - case ("--args" | "--arg") :: value :: tail => - userArgsBuffer += value - args = tail - - case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => - numExecutors = value - args = tail - - case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => - executorMemory = value - args = tail - - case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => - executorCores = value - args = tail - - case _ => - printUsageAndExit(1, args) - } - } - - userArgs = userArgsBuffer.readOnly - } - - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { - if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) - } - System.err.println(""" - |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] - |Options: - | --jar JAR_PATH Path to your application's JAR file - | --class CLASS_NAME Name of your application's main class - | --args ARGS Arguments to be passed to your application's main class. - | Multiple invocations are possible, each will be passed in order. - | --num-executors NUM Number of executors to start (Default: 2) - | --executor-cores NUM Number of cores for the executors (Default: 1) - | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) - """.stripMargin) - System.exit(exitCode) - } -} - -object ApplicationMasterArguments { - val DEFAULT_NUMBER_EXECUTORS = 2 -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala deleted file mode 100644 index 4d859450ef..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.util.{Utils, IntParam, MemoryParam} - -// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! -private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { - var addJars: String = null - var files: String = null - var archives: String = null - var userJar: String = null - var userClass: String = null - var userArgs: Seq[String] = Seq[String]() - var executorMemory = 1024 // MB - var executorCores = 1 - var numExecutors = DEFAULT_NUMBER_EXECUTORS - var amQueue = sparkConf.get("spark.yarn.queue", "default") - var amMemory: Int = 512 // MB - var appName: String = "Spark" - var priority = 0 - - // Additional memory to allocate to containers - // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) - - val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - - private val isDynamicAllocationEnabled = - sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) - - parseArgs(args.toList) - loadEnvironmentArgs() - validateArgs() - - /** Load any default arguments provided through environment variables and Spark properties. */ - private def loadEnvironmentArgs(): Unit = { - // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, - // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). - files = Option(files) - .orElse(sys.env.get("SPARK_YARN_DIST_FILES")) - .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p))) - .orNull - archives = Option(archives) - .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) - .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) - .orNull - // If dynamic allocation is enabled, start at the max number of executors - if (isDynamicAllocationEnabled) { - val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors" - if (!sparkConf.contains(maxExecutorsConf)) { - throw new IllegalArgumentException( - s"$maxExecutorsConf must be set if dynamic allocation is enabled!") - } - numExecutors = sparkConf.get(maxExecutorsConf).toInt - } - } - - /** - * Fail fast if any arguments provided are invalid. - * This is intended to be called only after the provided arguments have been parsed. - */ - private def validateArgs(): Unit = { - if (numExecutors <= 0) { - throw new IllegalArgumentException( - "You must specify at least 1 executor!\n" + getUsageMessage()) - } - } - - private def parseArgs(inputArgs: List[String]): Unit = { - val userArgsBuffer = new ArrayBuffer[String]() - var args = inputArgs - - while (!args.isEmpty) { - args match { - case ("--jar") :: value :: tail => - userJar = value - args = tail - - case ("--class") :: value :: tail => - userClass = value - args = tail - - case ("--args" | "--arg") :: value :: tail => - if (args(0) == "--args") { - println("--args is deprecated. Use --arg instead.") - } - userArgsBuffer += value - args = tail - - case ("--master-class" | "--am-class") :: value :: tail => - println(s"${args(0)} is deprecated and is not used anymore.") - args = tail - - case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => - if (args(0) == "--master-memory") { - println("--master-memory is deprecated. Use --driver-memory instead.") - } - amMemory = value - args = tail - - case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => - if (args(0) == "--num-workers") { - println("--num-workers is deprecated. Use --num-executors instead.") - } - // Dynamic allocation is not compatible with this option - if (isDynamicAllocationEnabled) { - throw new IllegalArgumentException("Explicitly setting the number " + - "of executors is not compatible with spark.dynamicAllocation.enabled!") - } - numExecutors = value - args = tail - - case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => - if (args(0) == "--worker-memory") { - println("--worker-memory is deprecated. Use --executor-memory instead.") - } - executorMemory = value - args = tail - - case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => - if (args(0) == "--worker-cores") { - println("--worker-cores is deprecated. Use --executor-cores instead.") - } - executorCores = value - args = tail - - case ("--queue") :: value :: tail => - amQueue = value - args = tail - - case ("--name") :: value :: tail => - appName = value - args = tail - - case ("--addJars") :: value :: tail => - addJars = value - args = tail - - case ("--files") :: value :: tail => - files = value - args = tail - - case ("--archives") :: value :: tail => - archives = value - args = tail - - case Nil => - - case _ => - throw new IllegalArgumentException(getUsageMessage(args)) - } - } - - userArgs = userArgsBuffer.readOnly - } - - private def getUsageMessage(unknownParam: List[String] = null): String = { - val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - message + - "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + - "Options:\n" + - " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + - " --class CLASS_NAME Name of your application's main class (required)\n" + - " --arg ARG Argument to be passed to your application's main class.\n" + - " Multiple invocations are possible, each will be passed in order.\n" + - " --num-executors NUM Number of executors to start (Default: 2)\n" + - " --executor-cores NUM Number of cores for the executors (Default: 1).\n" + - " --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" + - " --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" + - " --name NAME The name of your application (Default: Spark)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + - " --files files Comma separated list of files to be distributed with the job.\n" + - " --archives archives Comma separated list of archives to be distributed with the job." - } -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala deleted file mode 100644 index f95d723791..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ /dev/null @@ -1,823 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ListBuffer, Map} -import scala.util.{Try, Success, Failure} - -import com.google.common.base.Objects -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.mapred.Master -import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.util.StringUtils -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.Records - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} -import org.apache.spark.util.Utils - -/** - * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. - * The Client submits an application to the YARN ResourceManager. - */ -private[spark] trait ClientBase extends Logging { - import ClientBase._ - - protected val args: ClientArguments - protected val hadoopConf: Configuration - protected val sparkConf: SparkConf - protected val yarnConf: YarnConfiguration - protected val credentials = UserGroupInformation.getCurrentUser.getCredentials - protected val amMemoryOverhead = args.amMemoryOverhead // MB - protected val executorMemoryOverhead = args.executorMemoryOverhead // MB - private val distCacheMgr = new ClientDistributedCacheManager() - private val isLaunchingDriver = args.userClass != null - - /** - * Fail fast if we have requested more resources per container than is available in the cluster. - */ - protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { - val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() - logInfo("Verifying our application has not requested more than the maximum " + - s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = args.executorMemory + executorMemoryOverhead - if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + - s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") - } - val amMem = args.amMemory + amMemoryOverhead - if (amMem > maxMem) { - throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + - s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") - } - logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( - amMem, - amMemoryOverhead)) - - // We could add checks to make sure the entire cluster has enough resources but that involves - // getting all the node reports and computing ourselves. - } - - /** - * Copy the given file to a remote file system (e.g. HDFS) if needed. - * The file is only copied if the source and destination file systems are different. This is used - * for preparing resources for launching the ApplicationMaster container. Exposed for testing. - */ - def copyFileToRemote( - destDir: Path, - srcPath: Path, - replication: Short, - setPerms: Boolean = false): Path = { - val destFs = destDir.getFileSystem(hadoopConf) - val srcFs = srcPath.getFileSystem(hadoopConf) - var destPath = srcPath - if (!compareFs(srcFs, destFs)) { - destPath = new Path(destDir, srcPath.getName()) - logInfo(s"Uploading resource $srcPath -> $destPath") - FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) - destFs.setReplication(destPath, replication) - if (setPerms) { - destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) - } - } else { - logInfo(s"Source and destination file systems are the same. Not copying $srcPath") - } - // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific - // version shows the specific version in the distributed cache configuration - val qualifiedDestPath = destFs.makeQualified(destPath) - val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf) - fc.resolvePath(qualifiedDestPath) - } - - /** - * Given a local URI, resolve it and return a qualified local path that corresponds to the URI. - * This is used for preparing local resources to be included in the container launch context. - */ - private def getQualifiedLocalPath(localURI: URI): Path = { - val qualifiedURI = - if (localURI.getScheme == null) { - // If not specified, assume this is in the local filesystem to keep the behavior - // consistent with that of Hadoop - new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString) - } else { - localURI - } - new Path(qualifiedURI) - } - - /** - * Upload any resources to the distributed cache if needed. If a resource is intended to be - * consumed locally, set up the appropriate config for downstream code to handle it properly. - * This is used for setting up a container launch context for our ApplicationMaster. - * Exposed for testing. - */ - def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { - logInfo("Preparing resources for our AM container") - // Upload Spark and the application JAR to the remote file system if necessary, - // and add them as local resources to the application master. - val fs = FileSystem.get(hadoopConf) - val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = getNameNodesToAccess(sparkConf) + dst - obtainTokensForNamenodes(nns, hadoopConf, credentials) - - val replication = sparkConf.getInt("spark.yarn.submit.file.replication", - fs.getDefaultReplication(dst)).toShort - val localResources = HashMap[String, LocalResource]() - FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) - - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - - val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) - if (oldLog4jConf.isDefined) { - logWarning( - "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + - "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + - "for alternatives.") - } - - /** - * Copy the given main resource to the distributed cache if the scheme is not "local". - * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 4-tuple of: - * (1) destination resource name, - * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local, and - * (4) whether to set permissions for this resource - */ - List( - (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false), - (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true), - ("log4j.properties", oldLog4jConf.orNull, null, false) - ).foreach { case (destName, _localPath, confKey, setPermissions) => - val localPath: String = if (_localPath != null) _localPath.trim() else "" - if (!localPath.isEmpty()) { - val localURI = new URI(localPath) - if (localURI.getScheme != LOCAL_SCHEME) { - val src = getQualifiedLocalPath(localURI) - val destPath = copyFileToRemote(dst, src, replication, setPermissions) - val destFs = FileSystem.get(destPath.toUri(), hadoopConf) - distCacheMgr.addResource(destFs, hadoopConf, destPath, - localResources, LocalResourceType.FILE, destName, statCache) - } else if (confKey != null) { - // If the resource is intended for local use only, handle this downstream - // by setting the appropriate property - sparkConf.set(confKey, localPath) - } - } - } - - /** - * Do the same for any additional resources passed in through ClientArguments. - * Each resource category is represented by a 3-tuple of: - * (1) comma separated list of resources in this category, - * (2) resource type, and - * (3) whether to add these resources to the classpath - */ - val cachedSecondaryJarLinks = ListBuffer.empty[String] - List( - (args.addJars, LocalResourceType.FILE, true), - (args.files, LocalResourceType.FILE, false), - (args.archives, LocalResourceType.ARCHIVE, false) - ).foreach { case (flist, resType, addToClasspath) => - if (flist != null && !flist.isEmpty()) { - flist.split(',').foreach { file => - val localURI = new URI(file.trim()) - if (localURI.getScheme != LOCAL_SCHEME) { - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) - if (addToClasspath) { - cachedSecondaryJarLinks += linkname - } - } else if (addToClasspath) { - // Resource is intended for local use only and should be added to the class path - cachedSecondaryJarLinks += file.trim() - } - } - } - } - if (cachedSecondaryJarLinks.nonEmpty) { - sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) - } - - localResources - } - - /** - * Set up the environment for launching our ApplicationMaster container. - */ - private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = { - logInfo("Setting up the launch environment for our AM container") - val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - populateClasspath(args, yarnConf, sparkConf, env, extraCp) - env("SPARK_YARN_MODE") = "true" - env("SPARK_YARN_STAGING_DIR") = stagingDir - env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - - // Set the environment variables to be passed on to the executors. - distCacheMgr.setDistFilesEnv(env) - distCacheMgr.setDistArchivesEnv(env) - - // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* - val amEnvPrefix = "spark.yarn.appMasterEnv." - sparkConf.getAll - .filter { case (k, v) => k.startsWith(amEnvPrefix) } - .map { case (k, v) => (k.substring(amEnvPrefix.length), v) } - .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) } - - // Keep this for backwards compatibility but users should move to the config - sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - // Allow users to specify some environment variables. - YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) - // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. - env("SPARK_YARN_USER_ENV") = userEnvs - } - - // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to - // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's - // SparkContext will not let that set spark* system properties, which is expected behavior for - // Yarn clients. So propagate it through the environment. - // - // Note that to warn the user about the deprecation in cluster mode, some code from - // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition - // described above). - if (isLaunchingDriver) { - sys.env.get("SPARK_JAVA_OPTS").foreach { value => - val warning = - s""" - |SPARK_JAVA_OPTS was detected (set to '$value'). - |This is deprecated in Spark 1.0+. - | - |Please instead use: - | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - | - ./spark-submit with --driver-java-options to set -X options for a driver - | - spark.executor.extraJavaOptions to set -X options for executors - """.stripMargin - logWarning(warning) - for (proc <- Seq("driver", "executor")) { - val key = s"spark.$proc.extraJavaOptions" - if (sparkConf.contains(key)) { - throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.") - } - } - env("SPARK_JAVA_OPTS") = value - } - } - - env - } - - /** - * Set up a ContainerLaunchContext to launch our ApplicationMaster container. - * This sets up the launch environment, java options, and the command for launching the AM. - */ - protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { - logInfo("Setting up container launch context for our AM") - - val appId = newAppResponse.getApplicationId - val appStagingDir = getAppStagingDir(appId) - val localResources = prepareLocalResources(appStagingDir) - val launchEnv = setupLaunchEnv(appStagingDir) - val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) - amContainer.setLocalResources(localResources) - amContainer.setEnvironment(launchEnv) - - val javaOpts = ListBuffer[String]() - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - var prefixEnv: Option[String] = None - - // Add Xmx for AM memory - javaOpts += "-Xmx" + args.amMemory + "m" - - val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - javaOpts += "-Djava.io.tmpdir=" + tmpDir - - // TODO: Remove once cpuset version is pushed out. - // The context is, default gc for server class machines ends up using all cores to do gc - - // hence if there are multiple containers in same node, Spark GC affects all other containers' - // performance (which can be that of other Spark containers) - // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in - // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset - // of cores on a node. - val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean) - if (useConcurrentAndIncrementalGC) { - // In our expts, using (default) throughput collector has severe perf ramifications in - // multi-tenant machines - javaOpts += "-XX:+UseConcMarkSweepGC" - javaOpts += "-XX:+CMSIncrementalMode" - javaOpts += "-XX:+CMSIncrementalPacing" - javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" - javaOpts += "-XX:CMSIncrementalDutyCycle=10" - } - - // Forward the Spark configuration to the application master / executors. - // TODO: it might be nicer to pass these as an internal environment variable rather than - // as Java options, due to complications with string parsing of nested quotes. - for ((k, v) <- sparkConf.getAll) { - javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") - } - - // Include driver-specific java options if we are launching a driver - if (isLaunchingDriver) { - sparkConf.getOption("spark.driver.extraJavaOptions") - .orElse(sys.env.get("SPARK_JAVA_OPTS")) - .foreach(opts => javaOpts += opts) - val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"), - sys.props.get("spark.driver.libraryPath")).flatten - if (libraryPaths.nonEmpty) { - prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths)) - } - } - - // For log4j configuration to reference - javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - - val userClass = - if (isLaunchingDriver) { - Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) - } else { - Nil - } - val userJar = - if (args.userJar != null) { - Seq("--jar", args.userJar) - } else { - Nil - } - val amClass = - if (isLaunchingDriver) { - Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName - } else { - Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName - } - val userArgs = args.userArgs.flatMap { arg => - Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) - } - val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ userArgs ++ - Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) - - // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ - javaOpts ++ amArgs ++ - Seq( - "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - - // TODO: it would be nicer to just make sure there are no null commands here - val printableCommands = commands.map(s => if (s == null) "null" else s).toList - amContainer.setCommands(printableCommands) - - logDebug("===============================================================================") - logDebug("Yarn AM launch context:") - logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}") - logDebug(" env:") - launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") } - logDebug(" resources:") - localResources.foreach { case (k, v) => logDebug(s" $k -> $v")} - logDebug(" command:") - logDebug(s" ${printableCommands.mkString(" ")}") - logDebug("===============================================================================") - - // send the acl settings into YARN to control who has access via YARN interfaces - val securityManager = new SecurityManager(sparkConf) - amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) - setupSecurityToken(amContainer) - UserGroupInformation.getCurrentUser().addCredentials(credentials) - - amContainer - } - - /** - * Report the state of an application until it has exited, either successfully or - * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, - * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED, - * or KILLED). - * - * @param appId ID of the application to monitor. - * @param returnOnRunning Whether to also return the application state when it is RUNNING. - * @param logApplicationReport Whether to log details of the application report every iteration. - * @return A pair of the yarn application state and the final application state. - */ - def monitorApplication( - appId: ApplicationId, - returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { - val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) - var lastState: YarnApplicationState = null - while (true) { - Thread.sleep(interval) - val report = getApplicationReport(appId) - val state = report.getYarnApplicationState - - if (logApplicationReport) { - logInfo(s"Application report for $appId (state: $state)") - val details = Seq[(String, String)]( - ("client token", getClientToken(report)), - ("diagnostics", report.getDiagnostics), - ("ApplicationMaster host", report.getHost), - ("ApplicationMaster RPC port", report.getRpcPort.toString), - ("queue", report.getQueue), - ("start time", report.getStartTime.toString), - ("final status", report.getFinalApplicationStatus.toString), - ("tracking URL", report.getTrackingUrl), - ("user", report.getUser) - ) - - // Use more loggable format if value is null or empty - val formattedDetails = details - .map { case (k, v) => - val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") - s"\n\t $k: $newValue" } - .mkString("") - - // If DEBUG is enabled, log report details every iteration - // Otherwise, log them every time the application changes state - if (log.isDebugEnabled) { - logDebug(formattedDetails) - } else if (lastState != state) { - logInfo(formattedDetails) - } - } - - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return (state, report.getFinalApplicationStatus) - } - - if (returnOnRunning && state == YarnApplicationState.RUNNING) { - return (state, report.getFinalApplicationStatus) - } - - lastState = state - } - - // Never reached, but keeps compiler happy - throw new SparkException("While loop is depleted! This should never happen...") - } - - /** - * Submit an application to the ResourceManager and monitor its state. - * This continues until the application has exited for any reason. - * If the application finishes with a failed, killed, or undefined status, - * throw an appropriate SparkException. - */ - def run(): Unit = { - val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) - if (yarnApplicationState == YarnApplicationState.FAILED || - finalApplicationStatus == FinalApplicationStatus.FAILED) { - throw new SparkException("Application finished with failed status") - } - if (yarnApplicationState == YarnApplicationState.KILLED || - finalApplicationStatus == FinalApplicationStatus.KILLED) { - throw new SparkException("Application is killed") - } - if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) { - throw new SparkException("The final status of application is undefined") - } - } - - /* --------------------------------------------------------------------------------------- * - | Methods that cannot be implemented here due to API differences across hadoop versions | - * --------------------------------------------------------------------------------------- */ - - /** Submit an application running our ApplicationMaster to the ResourceManager. */ - def submitApplication(): ApplicationId - - /** Set up security tokens for launching our ApplicationMaster container. */ - protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit - - /** Get the application report from the ResourceManager for an application we have submitted. */ - protected def getApplicationReport(appId: ApplicationId): ApplicationReport - - /** - * Return the security token used by this client to communicate with the ApplicationMaster. - * If no security is enabled, the token returned by the report is null. - */ - protected def getClientToken(report: ApplicationReport): String -} - -private[spark] object ClientBase extends Logging { - - // Alias for the Spark assembly jar and the user jar - val SPARK_JAR: String = "__spark__.jar" - val APP_JAR: String = "__app__.jar" - - // URI scheme that identifies local resources - val LOCAL_SCHEME = "local" - - // Staging directory for any temporary jars or files - val SPARK_STAGING: String = ".sparkStaging" - - // Location of any user-defined Spark jars - val CONF_SPARK_JAR = "spark.yarn.jar" - val ENV_SPARK_JAR = "SPARK_JAR" - - // Internal config to propagate the location of the user's jar to the driver/executors - val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" - - // Internal config to propagate the locations of any extra jars to add to the classpath - // of the executors - val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" - - // Staging directory is private! -> rwx-------- - val STAGING_DIR_PERMISSION: FsPermission = - FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) - - // App files are world-wide readable and owner writable -> rw-r--r-- - val APP_FILE_PERMISSION: FsPermission = - FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) - - /** - * Find the user-defined Spark jar if configured, or return the jar containing this - * class if not. - * - * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the - * user environment if that is not found (for backwards compatibility). - */ - private def sparkJar(conf: SparkConf): String = { - if (conf.contains(CONF_SPARK_JAR)) { - conf.get(CONF_SPARK_JAR) - } else if (System.getenv(ENV_SPARK_JAR) != null) { - logWarning( - s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " + - s"in favor of the $CONF_SPARK_JAR configuration variable.") - System.getenv(ENV_SPARK_JAR) - } else { - SparkContext.jarOfClass(this.getClass).head - } - } - - /** - * Return the path to the given application's staging directory. - */ - private def getAppStagingDir(appId: ApplicationId): String = { - SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR - } - - /** - * Populate the classpath entry in the given environment map with any application - * classpath specified through the Hadoop and Yarn configurations. - */ - def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit = { - val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) - for (c <- classPathElementsToAdd.flatten) { - YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) - } - } - - private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = - Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { - case Some(s) => Some(s.toSeq) - case None => getDefaultYarnApplicationClasspath - } - - private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = - Option(conf.getStrings("mapreduce.application.classpath")) match { - case Some(s) => Some(s.toSeq) - case None => getDefaultMRApplicationClasspath - } - - def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") - val value = field.get(null).asInstanceOf[Array[String]] - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default YARN Application classpath.", f.exception) - case s: Success[_] => - logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - /** - * In Hadoop 0.23, the MR application classpath comes with the YARN application - * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String. - * So we need to use reflection to retrieve it. - */ - def getDefaultMRApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") - val value = if (field.getType == classOf[String]) { - StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray - } else { - field.get(null).asInstanceOf[Array[String]] - } - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default MR Application classpath.", f.exception) - case s: Success[_] => - logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - /** - * Populate the classpath entry in the given environment map. - * This includes the user jar, Spark jar, and any extra application jars. - */ - def populateClasspath( - args: ClientArguments, - conf: Configuration, - sparkConf: SparkConf, - env: HashMap[String, String], - extraClassPath: Option[String] = None): Unit = { - extraClassPath.foreach(addClasspathEntry(_, env)) - addClasspathEntry(Environment.PWD.$(), env) - - // Normally the users app.jar is last in case conflicts with spark jars - if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { - addUserClasspath(args, sparkConf, env) - addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - populateHadoopClasspath(conf, env) - } else { - addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - populateHadoopClasspath(conf, env) - addUserClasspath(args, sparkConf, env) - } - - // Append all jar files under the working directory to the classpath. - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env) - } - - /** - * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly - * to the classpath. - */ - private def addUserClasspath( - args: ClientArguments, - conf: SparkConf, - env: HashMap[String, String]): Unit = { - - // If `args` is not null, we are launching an AM container. - // Otherwise, we are launching executor containers. - val (mainJar, secondaryJars) = - if (args != null) { - (args.userJar, args.addJars) - } else { - (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null)) - } - - addFileToClasspath(mainJar, APP_JAR, env) - if (secondaryJars != null) { - secondaryJars.split(",").filter(_.nonEmpty).foreach { jar => - addFileToClasspath(jar, null, env) - } - } - } - - /** - * Adds the given path to the classpath, handling "local:" URIs correctly. - * - * If an alternate name for the file is given, and it's not a "local:" file, the alternate - * name will be added to the classpath (relative to the job's work directory). - * - * If not a "local:" file and no alternate name, the environment is not modified. - * - * @param path Path to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. - */ - private def addFileToClasspath( - path: String, - fileName: String, - env: HashMap[String, String]): Unit = { - if (path != null) { - scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { - val uri = new URI(path) - if (uri.getScheme == LOCAL_SCHEME) { - addClasspathEntry(uri.getPath, env) - return - } - } - } - if (fileName != null) { - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env) - } - } - - /** - * Add the given path to the classpath entry of the given environment map. - * If the classpath is already set, this appends the new path to the existing classpath. - */ - private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit = - YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) - - /** - * Get the list of namenodes the user may access. - */ - def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { - sparkConf.get("spark.yarn.access.namenodes", "") - .split(",") - .map(_.trim()) - .filter(!_.isEmpty) - .map(new Path(_)) - .toSet - } - - def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - delegTokenRenewer - } - - /** - * Obtains tokens for the namenodes passed in and adds them to the credentials. - */ - def obtainTokensForNamenodes( - paths: Set[Path], - conf: Configuration, - creds: Credentials): Unit = { - if (UserGroupInformation.isSecurityEnabled()) { - val delegTokenRenewer = getTokenRenewer(conf) - paths.foreach { dst => - val dstFs = dst.getFileSystem(conf) - logDebug("getting token for namenode: " + dst) - dstFs.addDelegationTokens(delegTokenRenewer, creds) - } - } - } - - /** - * Return whether the two file systems are the same. - */ - private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { - val srcUri = srcFs.getUri() - val dstUri = destFs.getUri() - if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) { - return false - } - - var srcHost = srcUri.getHost() - var dstHost = dstUri.getHost() - - // In HA or when using viewfs, the host part of the URI may not actually be a host, but the - // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they - // match. - if (srcHost != null && dstHost != null && srcHost != dstHost) { - try { - srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() - dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() - } catch { - case e: UnknownHostException => - return false - } - } - - Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort() - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala deleted file mode 100644 index c592ecfdfc..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import scala.collection.mutable.{HashMap, LinkedHashMap, Map} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.util.{Records, ConverterUtils} - -import org.apache.spark.Logging - -/** Client side methods to setup the Hadoop distributed cache */ -private[spark] class ClientDistributedCacheManager() extends Logging { - - // Mappings from remote URI to (file status, modification time, visibility) - private val distCacheFiles: Map[String, (String, String, String)] = - LinkedHashMap[String, (String, String, String)]() - private val distCacheArchives: Map[String, (String, String, String)] = - LinkedHashMap[String, (String, String, String)]() - - - /** - * Add a resource to the list of distributed cache resources. This list can - * be sent to the ApplicationMaster and possibly the executors so that it can - * be downloaded into the Hadoop distributed cache for use by this application. - * Adds the LocalResource to the localResources HashMap passed in and saves - * the stats of the resources to they can be sent to the executors and verified. - * - * @param fs FileSystem - * @param conf Configuration - * @param destPath path to the resource - * @param localResources localResource hashMap to insert the resource into - * @param resourceType LocalResourceType - * @param link link presented in the distributed cache to the destination - * @param statCache cache to store the file/directory stats - * @param appMasterOnly Whether to only add the resource to the app master - */ - def addResource( - fs: FileSystem, - conf: Configuration, - destPath: Path, - localResources: HashMap[String, LocalResource], - resourceType: LocalResourceType, - link: String, - statCache: Map[URI, FileStatus], - appMasterOnly: Boolean = false): Unit = { - val destStatus = fs.getFileStatus(destPath) - val amJarRsrc = Records.newRecord(classOf[LocalResource]) - amJarRsrc.setType(resourceType) - val visibility = getVisibility(conf, destPath.toUri(), statCache) - amJarRsrc.setVisibility(visibility) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)) - amJarRsrc.setTimestamp(destStatus.getModificationTime()) - amJarRsrc.setSize(destStatus.getLen()) - if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name") - localResources(link) = amJarRsrc - - if (!appMasterOnly) { - val uri = destPath.toUri() - val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link) - if (resourceType == LocalResourceType.FILE) { - distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), - destStatus.getModificationTime().toString(), visibility.name()) - } else { - distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), - destStatus.getModificationTime().toString(), visibility.name()) - } - } - } - - /** - * Adds the necessary cache file env variables to the env passed in - */ - def setDistFilesEnv(env: Map[String, String]): Unit = { - val (keys, tupleValues) = distCacheFiles.unzip - val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { - env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = - timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = - sizes.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = - visibilities.reduceLeft[String] { (acc,n) => acc + "," + n } - } - } - - /** - * Adds the necessary cache archive env variables to the env passed in - */ - def setDistArchivesEnv(env: Map[String, String]): Unit = { - val (keys, tupleValues) = distCacheArchives.unzip - val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { - env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = - timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = - sizes.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = - visibilities.reduceLeft[String] { (acc,n) => acc + "," + n } - } - } - - /** - * Returns the local resource visibility depending on the cache file permissions - * @return LocalResourceVisibility - */ - def getVisibility( - conf: Configuration, - uri: URI, - statCache: Map[URI, FileStatus]): LocalResourceVisibility = { - if (isPublic(conf, uri, statCache)) { - LocalResourceVisibility.PUBLIC - } else { - LocalResourceVisibility.PRIVATE - } - } - - /** - * Returns a boolean to denote whether a cache file is visible to all (public) - * @return true if the path in the uri is visible to all, false otherwise - */ - def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { - val fs = FileSystem.get(uri, conf) - val current = new Path(uri.getPath()) - // the leaf level file should be readable by others - if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { - return false - } - ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) - } - - /** - * Returns true if all ancestors of the specified path have the 'execute' - * permission set for all users (i.e. that other users can traverse - * the directory hierarchy to the given path) - * @return true if all ancestors have the 'execute' permission set for all users - */ - def ancestorsHaveExecutePermissions( - fs: FileSystem, - path: Path, - statCache: Map[URI, FileStatus]): Boolean = { - var current = path - while (current != null) { - // the subdirs in the path should have execute permissions for others - if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { - return false - } - current = current.getParent() - } - true - } - - /** - * Checks for a given path whether the Other permissions on it - * imply the permission in the passed FsAction - * @return true if the path in the uri is visible to all, false otherwise - */ - def checkPermissionOfOther( - fs: FileSystem, - path: Path, - action: FsAction, - statCache: Map[URI, FileStatus]): Boolean = { - val status = getFileStatus(fs, path.toUri(), statCache) - val perms = status.getPermission() - val otherAction = perms.getOtherAction() - otherAction.implies(action) - } - - /** - * Checks to see if the given uri exists in the cache, if it does it - * returns the existing FileStatus, otherwise it stats the uri, stores - * it in the cache, and returns the FileStatus. - * @return FileStatus - */ - def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = { - val stat = statCache.get(uri) match { - case Some(existstat) => existstat - case None => - val newStat = fs.getFileStatus(new Path(uri)) - statCache.put(uri, newStat) - newStat - } - stat - } -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala deleted file mode 100644 index 88dad0febd..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ListBuffer} - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.util.Utils - -trait ExecutorRunnableUtil extends Logging { - - val yarnConf: YarnConfiguration - val sparkConf: SparkConf - lazy val env = prepareEnvironment - - def prepareCommand( - masterAddress: String, - slaveId: String, - hostname: String, - executorMemory: Int, - executorCores: Int, - appId: String, - localResources: HashMap[String, LocalResource]): List[String] = { - // Extra options for the JVM - val javaOpts = ListBuffer[String]() - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - var prefixEnv: Option[String] = None - - // Set the JVM memory - val executorMemoryString = executorMemory + "m" - javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " - - // Set extra Java options for the executor, if defined - sys.props.get("spark.executor.extraJavaOptions").foreach { opts => - javaOpts += opts - } - sys.env.get("SPARK_JAVA_OPTS").foreach { opts => - javaOpts += opts - } - sys.props.get("spark.executor.extraLibraryPath").foreach { p => - prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p))) - } - - javaOpts += "-Djava.io.tmpdir=" + - new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - - // Certain configs need to be passed here because they are needed before the Executor - // registers with the Scheduler and transfers the spark configs. Since the Executor backend - // uses Akka to connect to the scheduler, the akka settings are needed as well as the - // authentication settings. - sparkConf.getAll. - filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - - sparkConf.getAkkaConf. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - - // Commenting it out for now - so that people can refer to the properties if required. Remove - // it once cpuset version is pushed out. - // The context is, default gc for server class machines end up using all cores to do gc - hence - // if there are multiple containers in same node, spark gc effects all other containers - // performance (which can also be other spark containers) - // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in - // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset - // of cores on a node. - /* - else { - // If no java_opts specified, default to using -XX:+CMSIncrementalMode - // It might be possible that other modes/config is being done in spark.executor.extraJavaOptions, - // so we dont want to mess with it. - // In our expts, using (default) throughput collector has severe perf ramnifications in - // multi-tennent machines - // The options are based on - // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline - javaOpts += " -XX:+UseConcMarkSweepGC " - javaOpts += " -XX:+CMSIncrementalMode " - javaOpts += " -XX:+CMSIncrementalPacing " - javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 " - javaOpts += " -XX:CMSIncrementalDutyCycle=10 " - } - */ - - // For log4j configuration to reference - javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", - "-server", - // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in - // an inconsistent state. - // TODO: If the OOM is not recoverable by rescheduling it on different node, then do - // 'something' to fail job ... akin to blacklisting trackers in mapred ? - "-XX:OnOutOfMemoryError='kill %p'") ++ - javaOpts ++ - Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", - masterAddress.toString, - slaveId.toString, - hostname.toString, - executorCores.toString, - appId, - "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - - // TODO: it would be nicer to just make sure there are no null commands here - commands.map(s => if (s == null) "null" else s).toList - } - - private def setupDistributedCache( - file: String, - rtype: LocalResourceType, - localResources: HashMap[String, LocalResource], - timestamp: String, - size: String, - vis: String): Unit = { - val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]) - amJarRsrc.setType(rtype) - amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) - amJarRsrc.setTimestamp(timestamp.toLong) - amJarRsrc.setSize(size.toLong) - localResources(uri.getFragment()) = amJarRsrc - } - - def prepareLocalResources: HashMap[String, LocalResource] = { - logInfo("Preparing Local resources") - val localResources = HashMap[String, LocalResource]() - - if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') - for( i <- 0 to distFiles.length - 1) { - setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), - fileSizes(i), visibilities(i)) - } - } - - if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') - val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') - for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, - timeStamps(i), fileSizes(i), visibilities(i)) - } - } - - logInfo("Prepared Local resources " + localResources) - localResources - } - - def prepareEnvironment: HashMap[String, String] = { - val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.executor.extraClassPath") - ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp) - - sparkConf.getExecutorEnv.foreach { case (key, value) => - // This assumes each executor environment variable set here is a path - // This is kept for backward compatibility and consistency with hadoop - YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) - } - - // Keep this for backwards compatibility but users should move to the config - sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) - } - - System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } - env - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala deleted file mode 100644 index b32e15738f..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ /dev/null @@ -1,538 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.{List => JList} -import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicInteger -import java.util.regex.Pattern - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} -import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend - -import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ - -object AllocationType extends Enumeration { - type AllocationType = Value - val HOST, RACK, ANY = Value -} - -// TODO: -// Too many params. -// Needs to be mt-safe -// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should -// make it more proactive and decoupled. - -// Note that right now, we assume all node asks as uniform in terms of capabilities and priority -// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for -// more info on how we are requesting for containers. - -/** - * Common code for the Yarn container allocator. Contains all the version-agnostic code to - * manage container allocation for a running Spark application. - */ -private[yarn] abstract class YarnAllocator( - conf: Configuration, - sparkConf: SparkConf, - appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]], - securityMgr: SecurityManager) - extends Logging { - - import YarnAllocator._ - - // These three are locked on allocatedHostToContainersMap. Complementary data structures - // allocatedHostToContainersMap : containers which are running : host, Set - // allocatedContainerToHostMap: container to host mapping. - private val allocatedHostToContainersMap = - new HashMap[String, collection.mutable.Set[ContainerId]]() - - private val allocatedContainerToHostMap = new HashMap[ContainerId, String]() - - // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an - // allocated node) - // As with the two data structures above, tightly coupled with them, and to be locked on - // allocatedHostToContainersMap - private val allocatedRackCount = new HashMap[String, Int]() - - // Containers to be released in next request to RM - private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean] - - // Number of container requests that have been sent to, but not yet allocated by the - // ApplicationMaster. - private val numPendingAllocate = new AtomicInteger() - private val numExecutorsRunning = new AtomicInteger() - // Used to generate a unique id per executor - private val executorIdCounter = new AtomicInteger() - private val numExecutorsFailed = new AtomicInteger() - - private var maxExecutors = args.numExecutors - - // Keep track of which container is running which executor to remove the executors later - private val executorIdToContainer = new HashMap[String, Container] - - protected val executorMemory = args.executorMemory - protected val executorCores = args.executorCores - protected val (preferredHostToCount, preferredRackToCount) = - generateNodeToWeight(conf, preferredNodes) - - // Additional memory overhead - in mb. - protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - - private val launcherPool = new ThreadPoolExecutor( - // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue - sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, - 1, TimeUnit.MINUTES, - new LinkedBlockingQueue[Runnable](), - new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) - launcherPool.allowCoreThreadTimeOut(true) - - def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - - def getNumExecutorsFailed: Int = numExecutorsFailed.intValue - - /** - * Request as many executors from the ResourceManager as needed to reach the desired total. - * This takes into account executors already running or pending. - */ - def requestTotalExecutors(requestedTotal: Int): Unit = synchronized { - val currentTotal = numPendingAllocate.get + numExecutorsRunning.get - if (requestedTotal > currentTotal) { - maxExecutors += (requestedTotal - currentTotal) - // We need to call `allocateResources` here to avoid the following race condition: - // If we request executors twice before `allocateResources` is called, then we will end up - // double counting the number requested because `numPendingAllocate` is not updated yet. - allocateResources() - } else { - logInfo(s"Not allocating more executors because there are already $currentTotal " + - s"(application requested $requestedTotal total)") - } - } - - /** - * Request that the ResourceManager release the container running the specified executor. - */ - def killExecutor(executorId: String): Unit = synchronized { - if (executorIdToContainer.contains(executorId)) { - val container = executorIdToContainer.remove(executorId).get - internalReleaseContainer(container) - numExecutorsRunning.decrementAndGet() - maxExecutors -= 1 - assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!") - } else { - logWarning(s"Attempted to kill unknown executor $executorId!") - } - } - - /** - * Allocate missing containers based on the number of executors currently pending and running. - * - * This method prioritizes the allocated container responses from the RM based on node and - * rack locality. Additionally, it releases any extra containers allocated for this application - * but are not needed. This must be synchronized because variables read in this block are - * mutated by other methods. - */ - def allocateResources(): Unit = synchronized { - val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() - - // this is needed by alpha, do it here since we add numPending right after this - val executorsPending = numPendingAllocate.get() - if (missing > 0) { - val totalExecutorMemory = executorMemory + memoryOverhead - numPendingAllocate.addAndGet(missing) - logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + - s"memory including $memoryOverhead MB overhead") - } else { - logDebug("Empty allocation request ...") - } - - val allocateResponse = allocateContainers(missing, executorsPending) - val allocatedContainers = allocateResponse.getAllocatedContainers() - - if (allocatedContainers.size > 0) { - var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size) - - if (numPendingAllocateNow < 0) { - numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow) - } - - logDebug(""" - Allocated containers: %d - Current executor count: %d - Containers released: %s - Cluster resources: %s - """.format( - allocatedContainers.size, - numExecutorsRunning.get(), - releasedContainers, - allocateResponse.getAvailableResources)) - - val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() - - for (container <- allocatedContainers) { - if (isResourceConstraintSatisfied(container)) { - // Add the accepted `container` to the host's list of already accepted, - // allocated containers - val host = container.getNodeId.getHost - val containersForHost = hostToContainers.getOrElseUpdate(host, - new ArrayBuffer[Container]()) - containersForHost += container - } else { - // Release container, since it doesn't satisfy resource constraints. - internalReleaseContainer(container) - } - } - - // Find the appropriate containers to use. - // TODO: Cleanup this group-by... - val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() - val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() - val offRackContainers = new HashMap[String, ArrayBuffer[Container]]() - - for (candidateHost <- hostToContainers.keySet) { - val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0) - val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost) - - val remainingContainersOpt = hostToContainers.get(candidateHost) - assert(remainingContainersOpt.isDefined) - var remainingContainers = remainingContainersOpt.get - - if (requiredHostCount >= remainingContainers.size) { - // Since we have <= required containers, add all remaining containers to - // `dataLocalContainers`. - dataLocalContainers.put(candidateHost, remainingContainers) - // There are no more free containers remaining. - remainingContainers = null - } else if (requiredHostCount > 0) { - // Container list has more containers than we need for data locality. - // Split the list into two: one based on the data local container count, - // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining - // containers. - val (dataLocal, remaining) = remainingContainers.splitAt( - remainingContainers.size - requiredHostCount) - dataLocalContainers.put(candidateHost, dataLocal) - - // Invariant: remainingContainers == remaining - - // YARN has a nasty habit of allocating a ton of containers on a host - discourage this. - // Add each container in `remaining` to list of containers to release. If we have an - // insufficient number of containers, then the next allocation cycle will reallocate - // (but won't treat it as data local). - // TODO(harvey): Rephrase this comment some more. - for (container <- remaining) internalReleaseContainer(container) - remainingContainers = null - } - - // For rack local containers - if (remainingContainers != null) { - val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) - if (rack != null) { - val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) - val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - - rackLocalContainers.getOrElse(rack, List()).size - - if (requiredRackCount >= remainingContainers.size) { - // Add all remaining containers to to `dataLocalContainers`. - dataLocalContainers.put(rack, remainingContainers) - remainingContainers = null - } else if (requiredRackCount > 0) { - // Container list has more containers that we need for data locality. - // Split the list into two: one based on the data local container count, - // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining - // containers. - val (rackLocal, remaining) = remainingContainers.splitAt( - remainingContainers.size - requiredRackCount) - val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, - new ArrayBuffer[Container]()) - - existingRackLocal ++= rackLocal - - remainingContainers = remaining - } - } - } - - if (remainingContainers != null) { - // Not all containers have been consumed - add them to the list of off-rack containers. - offRackContainers.put(candidateHost, remainingContainers) - } - } - - // Now that we have split the containers into various groups, go through them in order: - // first host-local, then rack-local, and finally off-rack. - // Note that the list we create below tries to ensure that not all containers end up within - // a host if there is a sufficiently large number of hosts/containers. - val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) - allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) - allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) - allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) - - // Run each of the allocated containers. - for (container <- allocatedContainersToProcess) { - val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() - val executorHostname = container.getNodeId.getHost - val containerId = container.getId - - val executorMemoryOverhead = (executorMemory + memoryOverhead) - assert(container.getResource.getMemory >= executorMemoryOverhead) - - if (numExecutorsRunningNow > maxExecutors) { - logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, executorHostname)) - internalReleaseContainer(container) - numExecutorsRunning.decrementAndGet() - } else { - val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( - SparkEnv.driverActorSystemName, - sparkConf.get("spark.driver.host"), - sparkConf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) - - logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) - executorIdToContainer(executorId) = container - - // To be safe, remove the container from `releasedContainers`. - releasedContainers.remove(containerId) - - val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname) - allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, - new HashSet[ContainerId]()) - - containerSet += containerId - allocatedContainerToHostMap.put(containerId, executorHostname) - - if (rack != null) { - allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) - } - } - logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( - driverUrl, executorHostname)) - val executorRunnable = new ExecutorRunnable( - container, - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr) - launcherPool.execute(executorRunnable) - } - } - logDebug(""" - Finished allocating %s containers (from %s originally). - Current number of executors running: %d, - Released containers: %s - """.format( - allocatedContainersToProcess, - allocatedContainers, - numExecutorsRunning.get(), - releasedContainers)) - } - - val completedContainers = allocateResponse.getCompletedContainersStatuses() - if (completedContainers.size > 0) { - logDebug("Completed %d containers".format(completedContainers.size)) - - for (completedContainer <- completedContainers) { - val containerId = completedContainer.getContainerId - - if (releasedContainers.containsKey(containerId)) { - // YarnAllocationHandler already marked the container for release, so remove it from - // `releasedContainers`. - releasedContainers.remove(containerId) - } else { - // Decrement the number of executors running. The next iteration of - // the ApplicationMaster's reporting thread will take care of allocating. - numExecutorsRunning.decrementAndGet() - logInfo("Completed container %s (state: %s, exit status: %s)".format( - containerId, - completedContainer.getState, - completedContainer.getExitStatus)) - // Hadoop 2.2.X added a ContainerExitStatus we should switch to use - // there are some exit status' we shouldn't necessarily count against us, but for - // now I think its ok as none of the containers are expected to exit - if (completedContainer.getExitStatus == -103) { // vmem limit exceeded - logWarning(memLimitExceededLogMessage( - completedContainer.getDiagnostics, - VMEM_EXCEEDED_PATTERN)) - } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded - logWarning(memLimitExceededLogMessage( - completedContainer.getDiagnostics, - PMEM_EXCEEDED_PATTERN)) - } else if (completedContainer.getExitStatus != 0) { - logInfo("Container marked as failed: " + containerId + - ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) - numExecutorsFailed.incrementAndGet() - } - } - - allocatedHostToContainersMap.synchronized { - if (allocatedContainerToHostMap.containsKey(containerId)) { - val hostOpt = allocatedContainerToHostMap.get(containerId) - assert(hostOpt.isDefined) - val host = hostOpt.get - - val containerSetOpt = allocatedHostToContainersMap.get(host) - assert(containerSetOpt.isDefined) - val containerSet = containerSetOpt.get - - containerSet.remove(containerId) - if (containerSet.isEmpty) { - allocatedHostToContainersMap.remove(host) - } else { - allocatedHostToContainersMap.update(host, containerSet) - } - - allocatedContainerToHostMap.remove(containerId) - - // TODO: Move this part outside the synchronized block? - val rack = YarnSparkHadoopUtil.lookupRack(conf, host) - if (rack != null) { - val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 - if (rackCount > 0) { - allocatedRackCount.put(rack, rackCount) - } else { - allocatedRackCount.remove(rack) - } - } - } - } - } - logDebug(""" - Finished processing %d completed containers. - Current number of executors running: %d, - Released containers: %s - """.format( - completedContainers.size, - numExecutorsRunning.get(), - releasedContainers)) - } - } - - protected def allocatedContainersOnHost(host: String): Int = { - var retval = 0 - allocatedHostToContainersMap.synchronized { - retval = allocatedHostToContainersMap.getOrElse(host, Set()).size - } - retval - } - - protected def allocatedContainersOnRack(rack: String): Int = { - var retval = 0 - allocatedHostToContainersMap.synchronized { - retval = allocatedRackCount.getOrElse(rack, 0) - } - retval - } - - private def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + memoryOverhead) - } - - // A simple method to copy the split info map. - private def generateNodeToWeight( - conf: Configuration, - input: collection.Map[String, collection.Set[SplitInfo]] - ): (Map[String, Int], Map[String, Int]) = { - - if (input == null) { - return (Map[String, Int](), Map[String, Int]()) - } - - val hostToCount = new HashMap[String, Int] - val rackToCount = new HashMap[String, Int] - - for ((host, splits) <- input) { - val hostCount = hostToCount.getOrElse(host, 0) - hostToCount.put(host, hostCount + splits.size) - - val rack = YarnSparkHadoopUtil.lookupRack(conf, host) - if (rack != null) { - val rackCount = rackToCount.getOrElse(host, 0) - rackToCount.put(host, rackCount + splits.size) - } - } - - (hostToCount.toMap, rackToCount.toMap) - } - - private def internalReleaseContainer(container: Container) = { - releasedContainers.put(container.getId(), true) - releaseContainer(container) - } - - /** - * Called to allocate containers in the cluster. - * - * @param count Number of containers to allocate. - * If zero, should still contact RM (as a heartbeat). - * @param pending Number of containers pending allocate. Only used on alpha. - * @return Response to the allocation request. - */ - protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse - - /** Called to release a previously allocated container. */ - protected def releaseContainer(container: Container): Unit - - /** - * Defines the interface for an allocate response from the RM. This is needed since the alpha - * and stable interfaces differ here in ways that cannot be fixed using other routes. - */ - protected trait YarnAllocateResponse { - - def getAllocatedContainers(): JList[Container] - - def getAvailableResources(): Resource - - def getCompletedContainersStatuses(): JList[ContainerStatus] - - } - -} - -private object YarnAllocator { - val MEM_REGEX = "[0-9.]+ [KMG]B" - val PMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used") - val VMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used") - - def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = { - val matcher = pattern.matcher(diagnostics) - val diag = if (matcher.find()) " " + matcher.group() + "." else "" - ("Container killed by YARN for exceeding memory limits." + diag - + " Consider boosting spark.yarn.executor.memoryOverhead.") - } -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala deleted file mode 100644 index 2510b9c9ce..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.{Map, Set} - -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records._ - -import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.scheduler.SplitInfo - -/** - * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that - * is used by Spark's AM. - */ -trait YarnRMClient { - - /** - * Registers the application master with the RM. - * - * @param conf The Yarn configuration. - * @param sparkConf The Spark configuration. - * @param preferredNodeLocations Map with hints about where to allocate containers. - * @param uiAddress Address of the SparkUI. - * @param uiHistoryAddress Address of the application on the History Server. - */ - def register( - conf: YarnConfiguration, - sparkConf: SparkConf, - preferredNodeLocations: Map[String, Set[SplitInfo]], - uiAddress: String, - uiHistoryAddress: String, - securityMgr: SecurityManager): YarnAllocator - - /** - * Unregister the AM. Guaranteed to only be called once. - * - * @param status The final status of the AM. - * @param diagnostics Diagnostics message to include in the final status. - */ - def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit - - /** Returns the attempt ID. */ - def getAttemptId(): ApplicationAttemptId - - /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ - def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] - - /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala deleted file mode 100644 index 7d453ecb79..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.lang.{Boolean => JBoolean} -import java.io.File -import java.util.{Collections, Set => JSet} -import java.util.regex.Matcher -import java.util.regex.Pattern -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.mutable.HashMap - -import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records.ApplicationAccessType -import org.apache.hadoop.yarn.util.RackResolver -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils - -/** - * Contains util methods to interact with Hadoop from spark. - */ -class YarnSparkHadoopUtil extends SparkHadoopUtil { - - override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { - dest.addCredentials(source.getCredentials()) - } - - // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - override def isYarnMode(): Boolean = { true } - - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems - // Always create a new config, dont reuse yarnConf. - override def newConfiguration(conf: SparkConf): Configuration = - new YarnConfiguration(super.newConfiguration(conf)) - - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials() - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) - } - - override def getCurrentUserCredentials(): Credentials = { - UserGroupInformation.getCurrentUser().getCredentials() - } - - override def addCurrentUserCredentials(creds: Credentials) { - UserGroupInformation.getCurrentUser().addCredentials(creds) - } - - override def addSecretKeyToUserCredentials(key: String, secret: String) { - val creds = new Credentials() - creds.addSecretKey(new Text(key), secret.getBytes("utf-8")) - addCurrentUserCredentials(creds) - } - - override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { - val credentials = getCurrentUserCredentials() - if (credentials != null) credentials.getSecretKey(new Text(key)) else null - } - -} - -object YarnSparkHadoopUtil { - // Additional memory overhead - // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering - // the common cases. Memory overhead tends to grow with container size. - - val MEMORY_OVERHEAD_FACTOR = 0.07 - val MEMORY_OVERHEAD_MIN = 384 - - val ANY_HOST = "*" - - val DEFAULT_NUMBER_EXECUTORS = 2 - - // All RM requests are issued with same priority : we do not (yet) have any distinction between - // request types (like map/reduce in hadoop for example) - val RM_REQUEST_PRIORITY = 1 - - // Host to rack map - saved from allocation requests. We are expecting this not to change. - // Note that it is possible for this to change : and ResourceManager will indicate that to us via - // update response to allocate. But we are punting on handling that for now. - private val hostToRack = new ConcurrentHashMap[String, String]() - private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() - - /** - * Add a path variable to the given environment map. - * If the map already contains this key, append the value to the existing value instead. - */ - def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value - env.put(key, newValue) - } - - /** - * Set zero or more environment variables specified by the given input string. - * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3". - */ - def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = { - if (inputString != null && inputString.length() > 0) { - val childEnvs = inputString.split(",") - val p = Pattern.compile(environmentVariableRegex) - for (cEnv <- childEnvs) { - val parts = cEnv.split("=") // split on '=' - val m = p.matcher(parts(1)) - val sb = new StringBuffer - while (m.find()) { - val variable = m.group(1) - var replace = "" - if (env.get(variable) != None) { - replace = env.get(variable).get - } else { - // if this key is not configured for the child .. get it from the env - replace = System.getenv(variable) - if (replace == null) { - // the env key is note present anywhere .. simply set it - replace = "" - } - } - m.appendReplacement(sb, Matcher.quoteReplacement(replace)) - } - m.appendTail(sb) - // This treats the environment variable as path variable delimited by `File.pathSeparator` - // This is kept for backward compatibility and consistency with Hadoop's behavior - addPathToEnvironment(env, parts(0), sb.toString) - } - } - } - - private val environmentVariableRegex: String = { - if (Utils.isWindows) { - "%([A-Za-z_][A-Za-z0-9_]*?)%" - } else { - "\\$([A-Za-z_][A-Za-z0-9_]*)" - } - } - - /** - * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands - * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The - * argument is enclosed in single quotes and some key characters are escaped. - * - * @param arg A single argument. - * @return Argument quoted for execution via Yarn's generated shell script. - */ - def escapeForShell(arg: String): String = { - if (arg != null) { - val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\''") - case c => escaped.append(c) - } - } - escaped.append("'").toString() - } else { - arg - } - } - - def lookupRack(conf: Configuration, host: String): String = { - if (!hostToRack.contains(host)) { - populateRackInfo(conf, host) - } - hostToRack.get(host) - } - - def populateRackInfo(conf: Configuration, hostname: String) { - Utils.checkHost(hostname) - - if (!hostToRack.containsKey(hostname)) { - // If there are repeated failures to resolve, all to an ignore list. - val rackInfo = RackResolver.resolve(conf, hostname) - if (rackInfo != null && rackInfo.getNetworkLocation != null) { - val rack = rackInfo.getNetworkLocation - hostToRack.put(hostname, rack) - if (! rackToHostSet.containsKey(rack)) { - rackToHostSet.putIfAbsent(rack, - Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) - } - rackToHostSet.get(rack).add(hostname) - - // TODO(harvey): Figure out what this comment means... - // Since RackResolver caches, we are disabling this for now ... - } /* else { - // right ? Else we will keep calling rack resolver in case we cant resolve rack info ... - hostToRack.put(hostname, null) - } */ - } - } - - def getApplicationAclsForYarn(securityMgr: SecurityManager) - : Map[ApplicationAccessType, String] = { - Map[ApplicationAccessType, String] ( - ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, - ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls - ) - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala deleted file mode 100644 index 254774a6b8..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark._ -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils - -/** - * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. - */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) - } -} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala deleted file mode 100644 index 2923e6729c..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} - -import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments} -import org.apache.spark.scheduler.TaskSchedulerImpl - -private[spark] class YarnClientSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends YarnSchedulerBackend(scheduler, sc) - with Logging { - - private var client: Client = null - private var appId: ApplicationId = null - @volatile private var stopping: Boolean = false - - /** - * Create a Yarn client to submit an application to the ResourceManager. - * This waits until the application is running. - */ - override def start() { - super.start() - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - val hostport = driverHost + ":" + driverPort - sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) } - - val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ("--arg", hostport) - argsArrayBuf ++= getExtraClientArguments - - logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) - val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedExecutors = args.numExecutors - client = new Client(args, conf) - appId = client.submitApplication() - waitForApplication() - asyncMonitorApplication() - } - - /** - * Return any extra command line arguments to be passed to Client provided in the form of - * environment variables or Spark properties. - */ - private def getExtraClientArguments: Seq[String] = { - val extraArgs = new ArrayBuffer[String] - val optionTuples = // List of (target Client argument, environment variable, Spark property) - List( - ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), - ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), - ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), - ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), - ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name") - ) - optionTuples.foreach { case (optionName, envVar, sparkProp) => - if (System.getenv(envVar) != null) { - extraArgs += (optionName, System.getenv(envVar)) - } else if (sc.getConf.contains(sparkProp)) { - extraArgs += (optionName, sc.getConf.get(sparkProp)) - } - } - extraArgs - } - - /** - * Report the state of the application until it is running. - * If the application has finished, failed or been killed in the process, throw an exception. - * This assumes both `client` and `appId` have already been set. - */ - private def waitForApplication(): Unit = { - assert(client != null && appId != null, "Application has not been submitted yet!") - val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - throw new SparkException("Yarn application has already ended! " + - "It might have been killed or unable to launch application master.") - } - if (state == YarnApplicationState.RUNNING) { - logInfo(s"Application $appId has started running.") - } - } - - /** - * Monitor the application state in a separate thread. - * If the application has exited for any reason, stop the SparkContext. - * This assumes both `client` and `appId` have already been set. - */ - private def asyncMonitorApplication(): Unit = { - assert(client != null && appId != null, "Application has not been submitted yet!") - val t = new Thread { - override def run() { - while (!stopping) { - val report = client.getApplicationReport(appId) - val state = report.getYarnApplicationState() - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.KILLED || - state == YarnApplicationState.FAILED) { - logError(s"Yarn application has already exited with state $state!") - sc.stop() - stopping = true - } - Thread.sleep(1000L) - } - Thread.currentThread().interrupt() - } - } - t.setName("Yarn application state monitor") - t.setDaemon(true) - t.start() - } - - /** - * Stop the scheduler. This assumes `start()` has already been called. - */ - override def stop() { - assert(client != null, "Attempted to stop this scheduler before starting it!") - stopping = true - super.stop() - client.stop() - logInfo("Stopped") - } - - override def applicationId(): String = { - Option(appId).map(_.toString).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala deleted file mode 100644 index 4157ff95c2..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark._ -import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils - -/** - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of - * ApplicationMaster, etc is done - */ -private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { - - logInfo("Created YarnClusterScheduler") - - // Nothing else for now ... initialize application master : which needs a SparkContext to - // determine how to allocate. - // Note that only the first creation of a SparkContext influences (and ideally, there must be - // only one SparkContext, right ?). Subsequent creations are ignored since executors are already - // allocated by then. - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) - } - - override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) - super.postStartHook() - logInfo("YarnClusterScheduler.postStartHook done") - } - - override def stop() { - super.stop() - ApplicationMaster.sparkContextStopped(sc) - } - -} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala deleted file mode 100644 index b1de81e6a8..0000000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.IntParam - -private[spark] class YarnClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends YarnSchedulerBackend(scheduler, sc) { - - override def start() { - super.start() - totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS - if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) - .getOrElse(totalExpectedExecutors) - } - // System property can override environment variable. - totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) - } - - override def applicationId(): String = - // In YARN Cluster mode, spark.yarn.app.id is expect to be set - // before user application is launched. - // So, if spark.yarn.app.id is not set, it is something wrong. - sc.getConf.getOption("spark.yarn.app.id").getOrElse { - logError("Application ID is not set.") - super.applicationId - } - -} diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala deleted file mode 100644 index 17b79ae1d8..0000000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.File -import java.net.URI - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.mockito.Matchers._ -import org.mockito.Mockito._ - - -import org.scalatest.FunSuite -import org.scalatest.Matchers - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap => MutableHashMap } -import scala.reflect.ClassTag -import scala.util.Try - -import org.apache.spark.{SparkException, SparkConf} -import org.apache.spark.util.Utils - -class ClientBaseSuite extends FunSuite with Matchers { - - test("default Yarn application classpath") { - ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) - } - - test("default MR application classpath") { - ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) - } - - test("resultant classpath for an application that defines a classpath for YARN") { - withAppConf(Fixtures.mapYARNAppConf) { conf => - val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath)) - } - } - - test("resultant classpath for an application that defines a classpath for MR") { - withAppConf(Fixtures.mapMRAppConf) { conf => - val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) - } - } - - test("resultant classpath for an application that defines both classpaths, YARN and MR") { - withAppConf(Fixtures.mapAppConf) { conf => - val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) - classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) - } - } - - private val SPARK = "local:/sparkJar" - private val USER = "local:/userJar" - private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" - - test("Local jar URIs") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) - val env = new MutableHashMap[String, String]() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - - ClientBase.populateClasspath(args, conf, sparkConf, env) - - val cp = env("CLASSPATH").split(File.pathSeparator) - s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => - val uri = new URI(entry) - if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { - cp should contain (uri.getPath()) - } else { - cp should not contain (uri.getPath()) - } - }) - cp should contain (Environment.PWD.$()) - cp should contain (s"${Environment.PWD.$()}${File.separator}*") - cp should not contain (ClientBase.SPARK_JAR) - cp should not contain (ClientBase.APP_JAR) - } - - test("Jar path propagation through SparkConf") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) - val yarnConf = new YarnConfiguration() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - - val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) - doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort(), anyBoolean()) - - val tempDir = Utils.createTempDir() - try { - client.prepareLocalResources(tempDir.getAbsolutePath()) - sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) - - // The non-local path should be propagated by name only, since it will end up in the app's - // staging dir. - val expected = ADDED.split(",") - .map(p => { - val uri = new URI(p) - if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { - p - } else { - Option(uri.getFragment()).getOrElse(new File(p).getName()) - } - }) - .mkString(",") - - sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("check access nns empty") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set()) - } - - test("check access nns unset") { - val sparkConf = new SparkConf() - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set()) - } - - test("check access nns") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"))) - } - - test("check access nns space") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"))) - } - - test("check access two nns") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032"))) - } - - test("check token renewer") { - val hadoopConf = new Configuration() - hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") - hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM") - val renewer = ClientBase.getTokenRenewer(hadoopConf) - renewer should be ("yarn/myrm:8032@SPARKTEST.COM") - } - - test("check token renewer default") { - val hadoopConf = new Configuration() - val caught = - intercept[SparkException] { - ClientBase.getTokenRenewer(hadoopConf) - } - assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") - } - - object Fixtures { - - val knownDefYarnAppCP: Seq[String] = - getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], - "DEFAULT_YARN_APPLICATION_CLASSPATH", - Seq[String]())(a => a.toSeq) - - - val knownDefMRAppCP: Seq[String] = - getFieldValue2[String, Array[String], Seq[String]]( - classOf[MRJobConfig], - "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", - Seq[String]())(a => a.split(","))(a => a.toSeq) - - val knownYARNAppCP = Some(Seq("/known/yarn/path")) - - val knownMRAppCP = Some(Seq("/known/mr/path")) - - val mapMRAppConf = - Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) - - val mapYARNAppConf = - Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) - - val mapAppConf = mapYARNAppConf ++ mapMRAppConf - } - - def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { - val conf = new Configuration - m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") } - testCode(conf) - } - - def newEnv = MutableHashMap[String, String]() - - def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") - - def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray - - def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = - Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) - - def getFieldValue2[A: ClassTag, A1: ClassTag, B]( - clazz: Class[_], - field: String, - defaults: => B)(mapTo: A => B)(mapTo1: A1 => B) : B = { - Try(clazz.getField(field)).map(_.get(null)).map { - case v: A => mapTo(v) - case v1: A1 => mapTo1(v1) - case _ => defaults - }.toOption.getOrElse(defaults) - } - - private class DummyClient( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf, - val yarnConf: YarnConfiguration) extends ClientBase { - override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? - override def submitApplication(): ApplicationId = ??? - override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? - override def getClientToken(report: ApplicationReport): String = ??? - } - -} diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala deleted file mode 100644 index 80b57d1355..0000000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import org.scalatest.FunSuite -import org.scalatest.mock.MockitoSugar -import org.mockito.Mockito.when - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.yarn.api.records.LocalResource -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility -import org.apache.hadoop.yarn.api.records.LocalResourceType -import org.apache.hadoop.yarn.util.{Records, ConverterUtils} - -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map - - -class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { - - class MockClientDistributedCacheManager extends ClientDistributedCacheManager { - override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): - LocalResourceVisibility = { - LocalResourceVisibility.PRIVATE - } - } - - test("test getFileStatus empty") { - val distMgr = new ClientDistributedCacheManager() - val fs = mock[FileSystem] - val uri = new URI("/tmp/testing") - when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val stat = distMgr.getFileStatus(fs, uri, statCache) - assert(stat.getPath() === null) - } - - test("test getFileStatus cached") { - val distMgr = new ClientDistributedCacheManager() - val fs = mock[FileSystem] - val uri = new URI("/tmp/testing") - val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus) - val stat = distMgr.getFileStatus(fs, uri, statCache) - assert(stat.getPath().toString() === "/tmp/testing") - } - - test("test addResource") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", - statCache, false) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 0) - assert(resource.getSize() === 0) - assert(resource.getType() === LocalResourceType.FILE) - - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0") - assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0") - assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) - - // add another one and verify both there and order correct - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing2")) - val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2") - when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus) - distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", - statCache, false) - val resource2 = localResources("link2") - assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2) - assert(resource2.getTimestamp() === 10) - assert(resource2.getSize() === 20) - assert(resource2.getType() === LocalResourceType.FILE) - - val env2 = new HashMap[String, String]() - distMgr.setDistFilesEnv(env2) - val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val files = env2("SPARK_YARN_CACHE_FILES").split(',') - val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',') - assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(timestamps(0) === "0") - assert(sizes(0) === "0") - assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) - - assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") - assert(timestamps(1) === "10") - assert(sizes(1) === "20") - assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) - } - - test("test addResource link null") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - - intercept[Exception] { - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, - statCache, false) - } - assert(localResources.get("link") === None) - assert(localResources.size === 0) - } - - test("test addResource appmaster only") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", - statCache, true) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 10) - assert(resource.getSize() === 20) - assert(resource.getType() === LocalResourceType.ARCHIVE) - - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) - } - - test("test addResource archive") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", - statCache, false) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 10) - assert(resource.getSize() === 20) - assert(resource.getType() === LocalResourceType.ARCHIVE) - - val env = new HashMap[String, String]() - - distMgr.setDistArchivesEnv(env) - assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10") - assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20") - assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) - } - - -} diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala deleted file mode 100644 index 8d184a09d6..0000000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import org.apache.spark.deploy.yarn.YarnAllocator._ -import org.scalatest.FunSuite - -class YarnAllocatorSuite extends FunSuite { - test("memory exceeded diagnostic regexes") { - val diagnostics = - "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + - "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " + - "5.8 GB of 4.2 GB virtual memory used. Killing container." - val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN) - val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN) - assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) - assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) - } -} diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala deleted file mode 100644 index 2cc5abb3a8..0000000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.{File, IOException} - -import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.scalatest.{FunSuite, Matchers} - -import org.apache.hadoop.yarn.api.records.ApplicationAccessType - -import org.apache.spark.{Logging, SecurityManager, SparkConf} - - -class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { - - val hasBash = - try { - val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor() - exitCode == 0 - } catch { - case e: IOException => - false - } - - if (!hasBash) { - logWarning("Cannot execute bash, skipping bash tests.") - } - - def bashTest(name: String)(fn: => Unit) = - if (hasBash) test(name)(fn) else ignore(name)(fn) - - bashTest("shell script escaping") { - val scriptFile = File.createTempFile("script.", ".sh") - val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") - try { - val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") - Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) - scriptFile.setExecutable(true) - - val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath())) - val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim() - val err = new String(ByteStreams.toByteArray(proc.getErrorStream())) - val exitCode = proc.waitFor() - exitCode should be (0) - out should be (args.mkString(" ")) - } finally { - scriptFile.delete() - } - } - - test("Yarn configuration override") { - val key = "yarn.nodemanager.hostname" - val default = new YarnConfiguration() - - val sparkConf = new SparkConf() - .set("spark.hadoop." + key, "someHostName") - val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf) - - yarnConf.getClass() should be (classOf[YarnConfiguration]) - yarnConf.get(key) should not be default.get(key) - } - - - test("test getApplicationAclsForYarn acls on") { - - // spark acls on, just pick up default user - val sparkConf = new SparkConf() - sparkConf.set("spark.acls.enable", "true") - - val securityMgr = new SecurityManager(sparkConf) - val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) - - val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) - val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) - - viewAcls match { - case Some(vacls) => { - val aclSet = vacls.split(',').map(_.trim).toSet - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - modifyAcls match { - case Some(macls) => { - val aclSet = macls.split(',').map(_.trim).toSet - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - } - - test("test getApplicationAclsForYarn acls on and specify users") { - - // default spark acls are on and specify acls - val sparkConf = new SparkConf() - sparkConf.set("spark.acls.enable", "true") - sparkConf.set("spark.ui.view.acls", "user1,user2") - sparkConf.set("spark.modify.acls", "user3,user4") - - val securityMgr = new SecurityManager(sparkConf) - val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) - - val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) - val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) - - viewAcls match { - case Some(vacls) => { - val aclSet = vacls.split(',').map(_.trim).toSet - assert(aclSet.contains("user1")) - assert(aclSet.contains("user2")) - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - modifyAcls match { - case Some(macls) => { - val aclSet = macls.split(',').map(_.trim).toSet - assert(aclSet.contains("user3")) - assert(aclSet.contains("user4")) - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - - } -} -- cgit v1.2.3