aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala145
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala67
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala682
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala97
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala63
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala145
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala18
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala167
9 files changed, 738 insertions, 662 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index aff9ab71f0..5a20532315 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -23,13 +23,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +35,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
*/
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
+private[spark] class Client(
+ val args: ClientArguments,
+ val hadoopConf: Configuration,
+ val sparkConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf) =
@@ -45,112 +46,86 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
- val args = clientArgs
- val conf = hadoopConf
- val sparkConf = spConf
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
+ /* ------------------------------------------------------------------------------------- *
+ | The following methods have much in common in the stable and alpha versions of Client, |
+ | but cannot be implemented in the parent trait due to subtle API differences across |
+ | hadoop versions. |
+ * ------------------------------------------------------------------------------------- */
- // for client user who want to monitor app status by itself.
- def runApp() = {
- validateArgs()
-
+ /** Submit an application running our ApplicationMaster to the ResourceManager. */
+ override def submitApplication(): ApplicationId = {
init(yarnConf)
start()
- logClusterResourceDetails()
- val newApp = super.getNewApplication()
- val appId = newApp.getApplicationId()
+ logInfo("Requesting a new application from cluster with %d NodeManagers"
+ .format(getYarnClusterMetrics.getNumNodeManagers))
- verifyClusterResources(newApp)
- val appContext = createApplicationSubmissionContext(appId)
- val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val env = setupLaunchEnv(localResources, appStagingDir)
- val amContainer = createContainerLaunchContext(newApp, localResources, env)
+ // Get a new application from our RM
+ val newAppResponse = getNewApplication()
+ val appId = newAppResponse.getApplicationId()
- val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- // Memory for the ApplicationMaster.
- capability.setMemory(args.amMemory + memoryOverhead)
- amContainer.setResource(capability)
+ // Verify whether the cluster has enough resources for our AM
+ verifyClusterResources(newAppResponse)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ // Set up the appropriate contexts to launch our AM
+ val containerContext = createContainerLaunchContext(newAppResponse)
+ val appContext = createApplicationSubmissionContext(appId, containerContext)
- submitApp(appContext)
+ // Finally, submit and monitor the application
+ logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+ submitApplication(appContext)
appId
}
- def run() {
- val appId = runApp()
- monitorApplication(appId)
- }
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got cluster metric info from ASM, numNodeManagers = " +
- clusterMetrics.getNumNodeManagers)
+ /**
+ * Set up a context for launching our ApplicationMaster container.
+ * In the Yarn alpha API, the memory requirements of this container must be set in
+ * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
+ */
+ override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+ : ContainerLaunchContext = {
+ val containerContext = super.createContainerLaunchContext(newAppResponse)
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemory(args.amMemory + amMemoryOverhead)
+ containerContext.setResource(capability)
+ containerContext
}
-
- def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
- logInfo("Setting up application submission context for ASM")
+ /** Set up the context for submitting our ApplicationMaster. */
+ def createApplicationSubmissionContext(
+ appId: ApplicationId,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(containerContext)
+ appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
appContext
}
- def setupSecurityToken(amContainer: ContainerLaunchContext) = {
- // Setup security tokens.
+ /**
+ * Set up security tokens for launching our ApplicationMaster container.
+ * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
+ */
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
}
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager.
- logInfo("Submitting application to ASM")
- super.submitApplication(appContext)
- }
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
- while (true) {
- Thread.sleep(interval)
- val report = super.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToken: " + report.getClientToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- true
- }
+ /**
+ * Return the security token used by this client to communicate with the ApplicationMaster.
+ * If no security is enabled, the token returned by the report is null.
+ * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
+ */
+ override def getClientToken(report: ApplicationReport): String =
+ Option(report.getClientToken).getOrElse("")
}
object Client {
-
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
@@ -158,19 +133,17 @@ object Client {
}
// Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
-
val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
- case e: Exception => {
+ case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
- }
}
System.exit(0)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 40d8d6d6e6..201b742736 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -17,15 +17,14 @@
package org.apache.spark.deploy.yarn
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.util.{Utils, IntParam, MemoryParam}
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
+private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
@@ -35,28 +34,56 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = 2
- var amQueue = sparkConf.get("QUEUE", "default")
+ var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
var appName: String = "Spark"
var priority = 0
- parseArgs(args.toList)
+ // Additional memory to allocate to containers
+ // For now, use driver's memory overhead as our AM container's memory overhead
+ val amMemoryOverhead = sparkConf.getInt(
+ "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+ val executorMemoryOverhead = sparkConf.getInt(
+ "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
- // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
- // it should default to hdfs://
- files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
- archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
+ parseArgs(args.toList)
+ loadEnvironmentArgs()
+ validateArgs()
+
+ /** Load any default arguments provided through environment variables and Spark properties. */
+ private def loadEnvironmentArgs(): Unit = {
+ // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
+ // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
+ files = Option(files)
+ .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
+ .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
+ .orNull
+ archives = Option(archives)
+ .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
+ .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
+ .orNull
+ }
- // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
- // for both yarn-client and yarn-cluster
- files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
- map(p => Utils.resolveURIs(p)).orNull)
- archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
- map(p => Utils.resolveURIs(p)).orNull)
+ /**
+ * Fail fast if any arguments provided are invalid.
+ * This is intended to be called only after the provided arguments have been parsed.
+ */
+ private def validateArgs(): Unit = {
+ // TODO: memory checks are outdated (SPARK-3476)
+ Map[Boolean, String](
+ (numExecutors <= 0) -> "You must specify at least 1 executor!",
+ (amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB",
+ (executorMemory <= executorMemoryOverhead) ->
+ s"Executor memory must be > $executorMemoryOverhead MB"
+ ).foreach { case (errorCondition, errorMessage) =>
+ if (errorCondition) {
+ throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage())
+ }
+ }
+ }
private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
-
+ val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
while (!args.isEmpty) {
@@ -138,16 +165,14 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
userArgs = userArgsBuffer.readOnly
}
-
- def getUsageMessage(unknownParam: Any = null): String = {
+ private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
-
message +
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
- " --arg ARGS Argument to be passed to your application's main class.\n" +
+ " --arg ARG Argument to be passed to your application's main class.\n" +
" Multiple invocations are possible, each will be passed in order.\n" +
" --num-executors NUM Number of executors to start (Default: 2)\n" +
" --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6ae4d49622..4870b0cb3d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy.yarn
-import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import scala.collection.JavaConversions._
@@ -37,154 +36,107 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
/**
- * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
- * Client submits an application to the YARN ResourceManager.
+ * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
+ * The Client submits an application to the YARN ResourceManager.
*/
-trait ClientBase extends Logging {
- val args: ClientArguments
- val conf: Configuration
- val sparkConf: SparkConf
- val yarnConf: YarnConfiguration
- val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- private val SPARK_STAGING: String = ".sparkStaging"
+private[spark] trait ClientBase extends Logging {
+ import ClientBase._
+
+ protected val args: ClientArguments
+ protected val hadoopConf: Configuration
+ protected val sparkConf: SparkConf
+ protected val yarnConf: YarnConfiguration
+ protected val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ protected val amMemoryOverhead = args.amMemoryOverhead // MB
+ protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
- // Staging directory is private! -> rwx--------
- val STAGING_DIR_PERMISSION: FsPermission =
- FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
- // App files are world-wide readable and owner writable -> rw-r--r--
- val APP_FILE_PERMISSION: FsPermission =
- FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
-
- // Additional memory overhead - in mb.
- protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
- YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
- // TODO(harvey): This could just go in ClientArguments.
- def validateArgs() = {
- Map(
- (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
- (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
- "greater than: " + memoryOverhead),
- (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
- "must be greater than: " + memoryOverhead.toString)
- ).foreach { case(cond, errStr) =>
- if (cond) {
- logError(errStr)
- throw new IllegalArgumentException(args.getUsageMessage())
- }
- }
- }
-
- def getAppStagingDir(appId: ApplicationId): String = {
- SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
- }
-
- def verifyClusterResources(app: GetNewApplicationResponse) = {
- val maxMem = app.getMaximumResourceCapability().getMemory()
- logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
- // If we have requested more then the clusters max for a single resource then exit.
- if (args.executorMemory > maxMem) {
- val errorMessage =
- "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
- .format(args.executorMemory, maxMem)
-
- logError(errorMessage)
- throw new IllegalArgumentException(errorMessage)
- }
- val amMem = args.amMemory + memoryOverhead
+ /**
+ * Fail fast if we have requested more resources per container than is available in the cluster.
+ */
+ protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
+ val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
+ logInfo("Verifying our application has not requested more than the maximum " +
+ s"memory capability of the cluster ($maxMem MB per container)")
+ val executorMem = args.executorMemory + executorMemoryOverhead
+ if (executorMem > maxMem) {
+ throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
+ s"is above the max threshold ($maxMem MB) of this cluster!")
+ }
+ val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {
-
- val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
- .format(amMem, maxMem)
- logError(errorMessage)
- throw new IllegalArgumentException(errorMessage)
+ throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
+ s"is above the max threshold ($maxMem MB) of this cluster!")
}
-
// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
}
- /** See if two file systems are the same or not. */
- private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
- val srcUri = srcFs.getUri()
- val dstUri = destFs.getUri()
- if (srcUri.getScheme() == null) {
- return false
- }
- if (!srcUri.getScheme().equals(dstUri.getScheme())) {
- return false
- }
- var srcHost = srcUri.getHost()
- var dstHost = dstUri.getHost()
- if ((srcHost != null) && (dstHost != null)) {
- try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
- } catch {
- case e: UnknownHostException =>
- return false
- }
- if (!srcHost.equals(dstHost)) {
- return false
- }
- } else if (srcHost == null && dstHost != null) {
- return false
- } else if (srcHost != null && dstHost == null) {
- return false
- }
- if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
- }
- }
-
- /** Copy the file into HDFS if needed. */
- private[yarn] def copyRemoteFile(
- dstDir: Path,
- originalPath: Path,
+ /**
+ * Copy the given file to a remote file system (e.g. HDFS) if needed.
+ * The file is only copied if the source and destination file systems are different. This is used
+ * for preparing resources for launching the ApplicationMaster container. Exposed for testing.
+ */
+ def copyFileToRemote(
+ destDir: Path,
+ srcPath: Path,
replication: Short,
setPerms: Boolean = false): Path = {
- val fs = FileSystem.get(conf)
- val remoteFs = originalPath.getFileSystem(conf)
- var newPath = originalPath
- if (!compareFs(remoteFs, fs)) {
- newPath = new Path(dstDir, originalPath.getName())
- logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
- fs.setReplication(newPath, replication)
- if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+ val destFs = destDir.getFileSystem(hadoopConf)
+ val srcFs = srcPath.getFileSystem(hadoopConf)
+ var destPath = srcPath
+ if (!compareFs(srcFs, destFs)) {
+ destPath = new Path(destDir, srcPath.getName())
+ logInfo(s"Uploading resource $srcPath -> $destPath")
+ FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
+ destFs.setReplication(destPath, replication)
+ if (setPerms) {
+ destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
+ }
+ } else {
+ logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
}
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
- val qualPath = fs.makeQualified(newPath)
- val fc = FileContext.getFileContext(qualPath.toUri(), conf)
- val destPath = fc.resolvePath(qualPath)
- destPath
+ val qualifiedDestPath = destFs.makeQualified(destPath)
+ val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
+ fc.resolvePath(qualifiedDestPath)
}
- private def qualifyForLocal(localURI: URI): Path = {
- var qualifiedURI = localURI
- // If not specified, assume these are in the local filesystem to keep behavior like Hadoop
- if (qualifiedURI.getScheme() == null) {
- qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
- }
+ /**
+ * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
+ * This is used for preparing local resources to be included in the container launch context.
+ */
+ private def getQualifiedLocalPath(localURI: URI): Path = {
+ val qualifiedURI =
+ if (localURI.getScheme == null) {
+ // If not specified, assume this is in the local filesystem to keep the behavior
+ // consistent with that of Hadoop
+ new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString)
+ } else {
+ localURI
+ }
new Path(qualifiedURI)
}
+ /**
+ * Upload any resources to the distributed cache if needed. If a resource is intended to be
+ * consumed locally, set up the appropriate config for downstream code to handle it properly.
+ * This is used for setting up a container launch context for our ApplicationMaster.
+ * Exposed for testing.
+ */
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- // Upload Spark and the application JAR to the remote file system if necessary. Add them as
- // local resources to the application master.
- val fs = FileSystem.get(conf)
+ logInfo("Preparing resources for our AM container")
+ // Upload Spark and the application JAR to the remote file system if necessary,
+ // and add them as local resources to the application master.
+ val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
- ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
+ val nns = getNameNodesToAccess(sparkConf) + dst
+ obtainTokensForNamenodes(nns, hadoopConf, credentials)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
val localResources = HashMap[String, LocalResource]()
@@ -200,73 +152,84 @@ trait ClientBase extends Logging {
"for alternatives.")
}
+ /**
+ * Copy the given main resource to the distributed cache if the scheme is not "local".
+ * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
+ * Each resource is represented by a 4-tuple of:
+ * (1) destination resource name,
+ * (2) local path to the resource,
+ * (3) Spark property key to set if the scheme is not local, and
+ * (4) whether to set permissions for this resource
+ */
List(
- (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR),
- (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR),
- ("log4j.properties", oldLog4jConf.getOrElse(null), null)
- ).foreach { case(destName, _localPath, confKey) =>
+ (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
+ (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
+ ("log4j.properties", oldLog4jConf.orNull, null, false)
+ ).foreach { case (destName, _localPath, confKey, setPermissions) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
- if (! localPath.isEmpty()) {
+ if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
- if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
- val setPermissions = destName.equals(ClientBase.APP_JAR)
- val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
- val destFs = FileSystem.get(destPath.toUri(), conf)
- distCacheMgr.addResource(destFs, conf, destPath, localResources, LocalResourceType.FILE,
- destName, statCache)
+ if (localURI.getScheme != LOCAL_SCHEME) {
+ val src = getQualifiedLocalPath(localURI)
+ val destPath = copyFileToRemote(dst, src, replication, setPermissions)
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(destFs, hadoopConf, destPath,
+ localResources, LocalResourceType.FILE, destName, statCache)
} else if (confKey != null) {
+ // If the resource is intended for local use only, handle this downstream
+ // by setting the appropriate property
sparkConf.set(confKey, localPath)
}
}
}
+ /**
+ * Do the same for any additional resources passed in through ClientArguments.
+ * Each resource category is represented by a 3-tuple of:
+ * (1) comma separated list of resources in this category,
+ * (2) resource type, and
+ * (3) whether to add these resources to the classpath
+ */
val cachedSecondaryJarLinks = ListBuffer.empty[String]
- val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
+ List(
+ (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
- (args.archives, LocalResourceType.ARCHIVE, false) )
- fileLists.foreach { case (flist, resType, addToClasspath) =>
+ (args.archives, LocalResourceType.ARCHIVE, false)
+ ).foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
- flist.split(',').foreach { case file: String =>
+ flist.split(',').foreach { file =>
val localURI = new URI(file.trim())
- if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+ if (localURI.getScheme != LOCAL_SCHEME) {
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyRemoteFile(dst, localPath, replication)
- distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
- linkname, statCache)
+ val destPath = copyFileToRemote(dst, localPath, replication)
+ distCacheMgr.addResource(
+ fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}
} else if (addToClasspath) {
+ // Resource is intended for local use only and should be added to the class path
cachedSecondaryJarLinks += file.trim()
}
}
}
}
- logInfo("Prepared Local resources " + localResources)
- sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
+ if (cachedSecondaryJarLinks.nonEmpty) {
+ sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
+ }
- UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
}
- /** Get all application master environment variables set on this SparkConf */
- def getAppMasterEnv: Seq[(String, String)] = {
- val prefix = "spark.yarn.appMasterEnv."
- sparkConf.getAll.filter{case (k, v) => k.startsWith(prefix)}
- .map{case (k, v) => (k.substring(prefix.length), v)}
- }
-
-
- def setupLaunchEnv(
- localResources: HashMap[String, LocalResource],
- stagingDir: String): HashMap[String, String] = {
- logInfo("Setting up the launch environment")
-
+ /**
+ * Set up the environment for launching our ApplicationMaster container.
+ */
+ private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+ logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
-
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
- ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+ populateClasspath(args, yarnConf, sparkConf, env, extraCp)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -275,42 +238,20 @@ trait ClientBase extends Logging {
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
- getAppMasterEnv.foreach { case (key, value) =>
- YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
- }
+ // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
+ val amEnvPrefix = "spark.yarn.appMasterEnv."
+ sparkConf.getAll
+ .filter { case (k, v) => k.startsWith(amEnvPrefix) }
+ .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
+ .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }
// Keep this for backwards compatibility but users should move to the config
sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
// Allow users to specify some environment variables.
- YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs, File.pathSeparator)
-
+ YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
// Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
env("SPARK_YARN_USER_ENV") = userEnvs
}
- env
- }
-
- def userArgsToString(clientArgs: ClientArguments): String = {
- val prefix = " --arg "
- val args = clientArgs.userArgs
- val retval = new StringBuilder()
- for (arg <- args) {
- retval.append(prefix).append(" ").append(YarnSparkHadoopUtil.escapeForShell(arg))
- }
- retval.toString
- }
-
- def setupSecurityToken(amContainer: ContainerLaunchContext)
-
- def createContainerLaunchContext(
- newApp: GetNewApplicationResponse,
- localResources: HashMap[String, LocalResource],
- env: HashMap[String, String]): ContainerLaunchContext = {
- logInfo("Setting up container launch context")
- val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
- amContainer.setLocalResources(localResources)
-
- val isLaunchingDriver = args.userClass != null
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
@@ -320,6 +261,7 @@ trait ClientBase extends Logging {
// Note that to warn the user about the deprecation in cluster mode, some code from
// SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
// described above).
+ val isLaunchingDriver = args.userClass != null
if (isLaunchingDriver) {
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
@@ -342,14 +284,30 @@ trait ClientBase extends Logging {
env("SPARK_JAVA_OPTS") = value
}
}
- amContainer.setEnvironment(env)
- val amMemory = args.amMemory
+ env
+ }
+
+ /**
+ * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
+ * This sets up the launch environment, java options, and the command for launching the AM.
+ */
+ protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+ : ContainerLaunchContext = {
+ logInfo("Setting up container launch context for our AM")
+
+ val appId = newAppResponse.getApplicationId
+ val appStagingDir = getAppStagingDir(appId)
+ val localResources = prepareLocalResources(appStagingDir)
+ val launchEnv = setupLaunchEnv(appStagingDir)
+ val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+ amContainer.setLocalResources(localResources)
+ amContainer.setEnvironment(launchEnv)
val javaOpts = ListBuffer[String]()
// Add Xmx for AM memory
- javaOpts += "-Xmx" + amMemory + "m"
+ javaOpts += "-Xmx" + args.amMemory + "m"
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
@@ -361,8 +319,7 @@ trait ClientBase extends Logging {
// Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
// multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
// of cores on a node.
- val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
- java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+ val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
@@ -380,6 +337,8 @@ trait ClientBase extends Logging {
javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
}
+ // Include driver-specific java options if we are launching a driver
+ val isLaunchingDriver = args.userClass != null
if (isLaunchingDriver) {
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
@@ -397,19 +356,27 @@ trait ClientBase extends Logging {
} else {
Nil
}
+ val userJar =
+ if (args.userJar != null) {
+ Seq("--jar", args.userJar)
+ } else {
+ Nil
+ }
val amClass =
if (isLaunchingDriver) {
- classOf[ApplicationMaster].getName()
+ Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
- classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
+ Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
+ val userArgs = args.userArgs.flatMap { arg =>
+ Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
+ }
val amArgs =
- Seq(amClass) ++ userClass ++
- (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
- Seq("--executor-memory", args.executorMemory.toString,
+ Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+ Seq(
+ "--executor-memory", args.executorMemory.toString,
"--executor-cores", args.executorCores.toString,
- "--num-executors ", args.numExecutors.toString,
- userArgsToString(args))
+ "--num-executors ", args.numExecutors.toString)
// Command for the ApplicationMaster
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
@@ -418,41 +385,153 @@ trait ClientBase extends Logging {
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- logInfo("Yarn AM launch context:")
- logInfo(s" user class: ${args.userClass}")
- logInfo(s" env: $env")
- logInfo(s" command: ${commands.mkString(" ")}")
-
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands)
- setupSecurityToken(amContainer)
+ logDebug("===============================================================================")
+ logDebug("Yarn AM launch context:")
+ logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}")
+ logDebug(" env:")
+ launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") }
+ logDebug(" resources:")
+ localResources.foreach { case (k, v) => logDebug(s" $k -> $v")}
+ logDebug(" command:")
+ logDebug(s" ${printableCommands.mkString(" ")}")
+ logDebug("===============================================================================")
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+ setupSecurityToken(amContainer)
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
amContainer
}
+
+ /**
+ * Report the state of an application until it has exited, either successfully or
+ * due to some failure, then return the application state.
+ *
+ * @param appId ID of the application to monitor.
+ * @param returnOnRunning Whether to also return the application state when it is RUNNING.
+ * @param logApplicationReport Whether to log details of the application report every iteration.
+ * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
+ */
+ def monitorApplication(
+ appId: ApplicationId,
+ returnOnRunning: Boolean = false,
+ logApplicationReport: Boolean = true): YarnApplicationState = {
+ val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+ var lastState: YarnApplicationState = null
+ while (true) {
+ Thread.sleep(interval)
+ val report = getApplicationReport(appId)
+ val state = report.getYarnApplicationState
+
+ if (logApplicationReport) {
+ logInfo(s"Application report for $appId (state: $state)")
+ val details = Seq[(String, String)](
+ ("client token", getClientToken(report)),
+ ("diagnostics", report.getDiagnostics),
+ ("ApplicationMaster host", report.getHost),
+ ("ApplicationMaster RPC port", report.getRpcPort.toString),
+ ("queue", report.getQueue),
+ ("start time", report.getStartTime.toString),
+ ("final status", report.getFinalApplicationStatus.toString),
+ ("tracking URL", report.getTrackingUrl),
+ ("user", report.getUser)
+ )
+
+ // Use more loggable format if value is null or empty
+ val formattedDetails = details
+ .map { case (k, v) =>
+ val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+ s"\n\t $k: $newValue" }
+ .mkString("")
+
+ // If DEBUG is enabled, log report details every iteration
+ // Otherwise, log them every time the application changes state
+ if (log.isDebugEnabled) {
+ logDebug(formattedDetails)
+ } else if (lastState != state) {
+ logInfo(formattedDetails)
+ }
+ }
+
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ return state
+ }
+
+ if (returnOnRunning && state == YarnApplicationState.RUNNING) {
+ return state
+ }
+
+ lastState = state
+ }
+
+ // Never reached, but keeps compiler happy
+ throw new SparkException("While loop is depleted! This should never happen...")
+ }
+
+ /**
+ * Submit an application to the ResourceManager and monitor its state.
+ * This continues until the application has exited for any reason.
+ */
+ def run(): Unit = monitorApplication(submitApplication())
+
+ /* --------------------------------------------------------------------------------------- *
+ | Methods that cannot be implemented here due to API differences across hadoop versions |
+ * --------------------------------------------------------------------------------------- */
+
+ /** Submit an application running our ApplicationMaster to the ResourceManager. */
+ def submitApplication(): ApplicationId
+
+ /** Set up security tokens for launching our ApplicationMaster container. */
+ protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit
+
+ /** Get the application report from the ResourceManager for an application we have submitted. */
+ protected def getApplicationReport(appId: ApplicationId): ApplicationReport
+
+ /**
+ * Return the security token used by this client to communicate with the ApplicationMaster.
+ * If no security is enabled, the token returned by the report is null.
+ */
+ protected def getClientToken(report: ApplicationReport): String
}
-object ClientBase extends Logging {
+private[spark] object ClientBase extends Logging {
+
+ // Alias for the Spark assembly jar and the user jar
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
+
+ // URI scheme that identifies local resources
val LOCAL_SCHEME = "local"
+
+ // Staging directory for any temporary jars or files
+ val SPARK_STAGING: String = ".sparkStaging"
+
+ // Location of any user-defined Spark jars
val CONF_SPARK_JAR = "spark.yarn.jar"
- /**
- * This is an internal config used to propagate the location of the user's jar file to the
- * driver/executors.
- */
+ val ENV_SPARK_JAR = "SPARK_JAR"
+
+ // Internal config to propagate the location of the user's jar to the driver/executors
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
- /**
- * This is an internal config used to propagate the list of extra jars to add to the classpath
- * of executors.
- */
+
+ // Internal config to propagate the locations of any extra jars to add to the classpath
+ // of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
- val ENV_SPARK_JAR = "SPARK_JAR"
+
+ // Staging directory is private! -> rwx--------
+ val STAGING_DIR_PERMISSION: FsPermission =
+ FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+ // App files are world-wide readable and owner writable -> rw-r--r--
+ val APP_FILE_PERMISSION: FsPermission =
+ FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
/**
* Find the user-defined Spark jar if configured, or return the jar containing this
@@ -461,7 +540,7 @@ object ClientBase extends Logging {
* This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
* user environment if that is not found (for backwards compatibility).
*/
- def sparkJar(conf: SparkConf) = {
+ private def sparkJar(conf: SparkConf): String = {
if (conf.contains(CONF_SPARK_JAR)) {
conf.get(CONF_SPARK_JAR)
} else if (System.getenv(ENV_SPARK_JAR) != null) {
@@ -474,16 +553,22 @@ object ClientBase extends Logging {
}
}
- def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
+ /**
+ * Return the path to the given application's staging directory.
+ */
+ private def getAppStagingDir(appId: ApplicationId): String = {
+ SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+ }
+
+ /**
+ * Populate the classpath entry in the given environment map with any application
+ * classpath specified through the Hadoop and Yarn configurations.
+ */
+ def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit = {
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
for (c <- classPathElementsToAdd.flatten) {
- YarnSparkHadoopUtil.addToEnvironment(
- env,
- Environment.CLASSPATH.name,
- c.trim,
- File.pathSeparator)
+ YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
}
- classPathElementsToAdd
}
private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
@@ -519,7 +604,7 @@ object ClientBase extends Logging {
/**
* In Hadoop 0.23, the MR application classpath comes with the YARN application
- * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
+ * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
* So we need to use reflection to retrieve it.
*/
def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
@@ -545,8 +630,16 @@ object ClientBase extends Logging {
triedDefault.toOption
}
- def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
- env: HashMap[String, String], extraClassPath: Option[String] = None) {
+ /**
+ * Populate the classpath entry in the given environment map.
+ * This includes the user jar, Spark jar, and any extra application jars.
+ */
+ def populateClasspath(
+ args: ClientArguments,
+ conf: Configuration,
+ sparkConf: SparkConf,
+ env: HashMap[String, String],
+ extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach(addClasspathEntry(_, env))
addClasspathEntry(Environment.PWD.$(), env)
@@ -554,36 +647,40 @@ object ClientBase extends Logging {
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
addUserClasspath(args, sparkConf, env)
addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- ClientBase.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
} else {
addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- ClientBase.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
addUserClasspath(args, sparkConf, env)
}
// Append all jar files under the working directory to the classpath.
- addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env);
+ addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
}
/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
- private def addUserClasspath(args: ClientArguments, conf: SparkConf,
- env: HashMap[String, String]) = {
- if (args != null) {
- addFileToClasspath(args.userJar, APP_JAR, env)
- if (args.addJars != null) {
- args.addJars.split(",").foreach { case file: String =>
- addFileToClasspath(file, null, env)
- }
+ private def addUserClasspath(
+ args: ClientArguments,
+ conf: SparkConf,
+ env: HashMap[String, String]): Unit = {
+
+ // If `args` is not null, we are launching an AM container.
+ // Otherwise, we are launching executor containers.
+ val (mainJar, secondaryJars) =
+ if (args != null) {
+ (args.userJar, args.addJars)
+ } else {
+ (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
}
- } else {
- val userJar = conf.get(CONF_SPARK_USER_JAR, null)
- addFileToClasspath(userJar, APP_JAR, env)
- val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, "").split(",")
- cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env))
+ addFileToClasspath(mainJar, APP_JAR, env)
+ if (secondaryJars != null) {
+ secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
+ addFileToClasspath(jar, null, env)
+ }
}
}
@@ -599,46 +696,44 @@ object ClientBase extends Logging {
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
- private def addFileToClasspath(path: String, fileName: String,
- env: HashMap[String, String]) : Unit = {
+ private def addFileToClasspath(
+ path: String,
+ fileName: String,
+ env: HashMap[String, String]): Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
- val localPath = getLocalPath(path)
- if (localPath != null) {
- addClasspathEntry(localPath, env)
+ val uri = new URI(path)
+ if (uri.getScheme == LOCAL_SCHEME) {
+ addClasspathEntry(uri.getPath, env)
return
}
}
}
if (fileName != null) {
- addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env);
+ addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
}
}
/**
- * Returns the local path if the URI is a "local:" URI, or null otherwise.
+ * Add the given path to the classpath entry of the given environment map.
+ * If the classpath is already set, this appends the new path to the existing classpath.
*/
- private def getLocalPath(resource: String): String = {
- val uri = new URI(resource)
- if (LOCAL_SCHEME.equals(uri.getScheme())) {
- return uri.getPath()
- }
- null
- }
-
- private def addClasspathEntry(path: String, env: HashMap[String, String]) =
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
- File.pathSeparator)
+ private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
+ YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
* Get the list of namenodes the user may access.
*/
- private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
- .map(new Path(_)).toSet
+ def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get("spark.yarn.access.namenodes", "")
+ .split(",")
+ .map(_.trim())
+ .filter(!_.isEmpty)
+ .map(new Path(_))
+ .toSet
}
- private[yarn] def getTokenRenewer(conf: Configuration): String = {
+ def getTokenRenewer(conf: Configuration): String = {
val delegTokenRenewer = Master.getMasterPrincipal(conf)
logDebug("delegation token renewer is: " + delegTokenRenewer)
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
@@ -652,17 +747,54 @@ object ClientBase extends Logging {
/**
* Obtains tokens for the namenodes passed in and adds them to the credentials.
*/
- private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
- creds: Credentials) {
+ def obtainTokensForNamenodes(
+ paths: Set[Path],
+ conf: Configuration,
+ creds: Credentials): Unit = {
if (UserGroupInformation.isSecurityEnabled()) {
val delegTokenRenewer = getTokenRenewer(conf)
+ paths.foreach { dst =>
+ val dstFs = dst.getFileSystem(conf)
+ logDebug("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ }
+ }
+ }
- paths.foreach {
- dst =>
- val dstFs = dst.getFileSystem(conf)
- logDebug("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ /**
+ * Return whether the two file systems are the same.
+ */
+ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+ val srcUri = srcFs.getUri()
+ val dstUri = destFs.getUri()
+ if (srcUri.getScheme() == null) {
+ return false
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false
+ }
+ var srcHost = srcUri.getHost()
+ var dstHost = dstUri.getHost()
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+ } catch {
+ case e: UnknownHostException =>
+ return false
}
+ if (!srcHost.equals(dstHost)) {
+ return false
+ }
+ } else if (srcHost == null && dstHost != null) {
+ return false
+ } else if (srcHost != null && dstHost == null) {
+ return false
+ }
+ if (srcUri.getPort() != dstUri.getPort()) {
+ false
+ } else {
+ true
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 9b7f1fca96..c592ecfdfc 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -19,29 +19,24 @@ package org.apache.spark.deploy.yarn
import java.net.URI
+import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
+
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-import org.apache.spark.Logging
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.LinkedHashMap
-import scala.collection.mutable.Map
-
+import org.apache.spark.Logging
/** Client side methods to setup the Hadoop distributed cache */
-class ClientDistributedCacheManager() extends Logging {
- private val distCacheFiles: Map[String, Tuple3[String, String, String]] =
- LinkedHashMap[String, Tuple3[String, String, String]]()
- private val distCacheArchives: Map[String, Tuple3[String, String, String]] =
- LinkedHashMap[String, Tuple3[String, String, String]]()
+private[spark] class ClientDistributedCacheManager() extends Logging {
+
+ // Mappings from remote URI to (file status, modification time, visibility)
+ private val distCacheFiles: Map[String, (String, String, String)] =
+ LinkedHashMap[String, (String, String, String)]()
+ private val distCacheArchives: Map[String, (String, String, String)] =
+ LinkedHashMap[String, (String, String, String)]()
/**
@@ -68,9 +63,9 @@ class ClientDistributedCacheManager() extends Logging {
resourceType: LocalResourceType,
link: String,
statCache: Map[URI, FileStatus],
- appMasterOnly: Boolean = false) = {
+ appMasterOnly: Boolean = false): Unit = {
val destStatus = fs.getFileStatus(destPath)
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(resourceType)
val visibility = getVisibility(conf, destPath.toUri(), statCache)
amJarRsrc.setVisibility(visibility)
@@ -80,7 +75,7 @@ class ClientDistributedCacheManager() extends Logging {
if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
localResources(link) = amJarRsrc
- if (appMasterOnly == false) {
+ if (!appMasterOnly) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
if (resourceType == LocalResourceType.FILE) {
@@ -95,12 +90,10 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Adds the necessary cache file env variables to the env passed in
- * @param env
*/
- def setDistFilesEnv(env: Map[String, String]) = {
+ def setDistFilesEnv(env: Map[String, String]): Unit = {
val (keys, tupleValues) = distCacheFiles.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
if (keys.size > 0) {
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
@@ -114,12 +107,10 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Adds the necessary cache archive env variables to the env passed in
- * @param env
*/
- def setDistArchivesEnv(env: Map[String, String]) = {
+ def setDistArchivesEnv(env: Map[String, String]): Unit = {
val (keys, tupleValues) = distCacheArchives.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
if (keys.size > 0) {
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
@@ -133,25 +124,21 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Returns the local resource visibility depending on the cache file permissions
- * @param conf
- * @param uri
- * @param statCache
* @return LocalResourceVisibility
*/
- def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
+ def getVisibility(
+ conf: Configuration,
+ uri: URI,
+ statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
if (isPublic(conf, uri, statCache)) {
- return LocalResourceVisibility.PUBLIC
- }
- LocalResourceVisibility.PRIVATE
+ LocalResourceVisibility.PUBLIC
+ } else {
+ LocalResourceVisibility.PRIVATE
+ }
}
/**
- * Returns a boolean to denote whether a cache file is visible to all(public)
- * or not
- * @param conf
- * @param uri
- * @param statCache
+ * Returns a boolean to denote whether a cache file is visible to all (public)
* @return true if the path in the uri is visible to all, false otherwise
*/
def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
@@ -167,13 +154,12 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Returns true if all ancestors of the specified path have the 'execute'
* permission set for all users (i.e. that other users can traverse
- * the directory heirarchy to the given path)
- * @param fs
- * @param path
- * @param statCache
+ * the directory hierarchy to the given path)
* @return true if all ancestors have the 'execute' permission set for all users
*/
- def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path,
+ def ancestorsHaveExecutePermissions(
+ fs: FileSystem,
+ path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
var current = path
while (current != null) {
@@ -187,32 +173,25 @@ class ClientDistributedCacheManager() extends Logging {
}
/**
- * Checks for a given path whether the Other permissions on it
+ * Checks for a given path whether the Other permissions on it
* imply the permission in the passed FsAction
- * @param fs
- * @param path
- * @param action
- * @param statCache
* @return true if the path in the uri is visible to all, false otherwise
*/
- def checkPermissionOfOther(fs: FileSystem, path: Path,
- action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+ def checkPermissionOfOther(
+ fs: FileSystem,
+ path: Path,
+ action: FsAction,
+ statCache: Map[URI, FileStatus]): Boolean = {
val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
- if (otherAction.implies(action)) {
- return true
- }
- false
+ otherAction.implies(action)
}
/**
- * Checks to see if the given uri exists in the cache, if it does it
+ * Checks to see if the given uri exists in the cache, if it does it
* returns the existing FileStatus, otherwise it stats the uri, stores
* it in the cache, and returns the FileStatus.
- * @param fs
- * @param uri
- * @param statCache
* @return FileStatus
*/
def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index f56f72cafe..bbbf615510 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy.yarn
-import java.io.File
import java.net.URI
import scala.collection.JavaConversions._
@@ -128,9 +127,9 @@ trait ExecutorRunnableUtil extends Logging {
localResources: HashMap[String, LocalResource],
timestamp: String,
size: String,
- vis: String) = {
+ vis: String): Unit = {
val uri = new URI(file)
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
@@ -175,14 +174,17 @@ trait ExecutorRunnableUtil extends Logging {
ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
sparkConf.getExecutorEnv.foreach { case (key, value) =>
- YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
+ // This assumes each executor environment variable set here is a path
+ // This is kept for backward compatibility and consistency with hadoop
+ YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
}
// Keep this for backwards compatibility but users should move to the config
- YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
- File.pathSeparator)
+ sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+ YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+ }
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+ System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
env
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4a33e34c3b..0b712c2019 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import java.lang.{Boolean => JBoolean}
+import java.io.File
import java.util.{Collections, Set => JSet}
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -29,14 +30,12 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -100,30 +99,26 @@ object YarnSparkHadoopUtil {
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
- def addToEnvironment(
- env: HashMap[String, String],
- variable: String,
- value: String,
- classPathSeparator: String) = {
- var envVariable = ""
- if (env.get(variable) == None) {
- envVariable = value
- } else {
- envVariable = env.get(variable).get + classPathSeparator + value
- }
- env put (StringInterner.weakIntern(variable), StringInterner.weakIntern(envVariable))
+ /**
+ * Add a path variable to the given environment map.
+ * If the map already contains this key, append the value to the existing value instead.
+ */
+ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
+ val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value
+ env.put(key, newValue)
}
- def setEnvFromInputString(
- env: HashMap[String, String],
- envString: String,
- classPathSeparator: String) = {
- if (envString != null && envString.length() > 0) {
- var childEnvs = envString.split(",")
- var p = Pattern.compile(getEnvironmentVariableRegex())
+ /**
+ * Set zero or more environment variables specified by the given input string.
+ * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
+ */
+ def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = {
+ if (inputString != null && inputString.length() > 0) {
+ val childEnvs = inputString.split(",")
+ val p = Pattern.compile(environmentVariableRegex)
for (cEnv <- childEnvs) {
- var parts = cEnv.split("=") // split on '='
- var m = p.matcher(parts(1))
+ val parts = cEnv.split("=") // split on '='
+ val m = p.matcher(parts(1))
val sb = new StringBuffer
while (m.find()) {
val variable = m.group(1)
@@ -131,8 +126,7 @@ object YarnSparkHadoopUtil {
if (env.get(variable) != None) {
replace = env.get(variable).get
} else {
- // if this key is not configured for the child .. get it
- // from the env
+ // if this key is not configured for the child .. get it from the env
replace = System.getenv(variable)
if (replace == null) {
// the env key is note present anywhere .. simply set it
@@ -142,14 +136,15 @@ object YarnSparkHadoopUtil {
m.appendReplacement(sb, Matcher.quoteReplacement(replace))
}
m.appendTail(sb)
- addToEnvironment(env, parts(0), sb.toString(), classPathSeparator)
+ // This treats the environment variable as path variable delimited by `File.pathSeparator`
+ // This is kept for backward compatibility and consistency with Hadoop's behavior
+ addPathToEnvironment(env, parts(0), sb.toString)
}
}
}
- private def getEnvironmentVariableRegex() : String = {
- val osName = System.getProperty("os.name")
- if (osName startsWith "Windows") {
+ private val environmentVariableRegex: String = {
+ if (Utils.isWindows) {
"%([A-Za-z_][A-Za-z0-9_]*?)%"
} else {
"\\$([A-Za-z_][A-Za-z0-9_]*)"
@@ -181,14 +176,14 @@ object YarnSparkHadoopUtil {
}
}
- private[spark] def lookupRack(conf: Configuration, host: String): String = {
+ def lookupRack(conf: Configuration, host: String): String = {
if (!hostToRack.contains(host)) {
populateRackInfo(conf, host)
}
hostToRack.get(host)
}
- private[spark] def populateRackInfo(conf: Configuration, hostname: String) {
+ def populateRackInfo(conf: Configuration, hostname: String) {
Utils.checkHost(hostname)
if (!hostToRack.containsKey(hostname)) {
@@ -212,8 +207,8 @@ object YarnSparkHadoopUtil {
}
}
- private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
- Map[ApplicationAccessType, String] = {
+ def getApplicationAclsForYarn(securityMgr: SecurityManager)
+ : Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6aa6475fe4..200a308992 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl
import scala.collection.mutable.ArrayBuffer
@@ -34,115 +34,120 @@ private[spark] class YarnClientSchedulerBackend(
minRegisteredRatio = 0.8
}
- var client: Client = null
- var appId: ApplicationId = null
- var checkerThread: Thread = null
- var stopping: Boolean = false
- var totalExpectedExecutors = 0
-
- private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
- arrayBuf: ArrayBuffer[String]) {
- if (System.getenv(envVar) != null) {
- arrayBuf += (optionName, System.getenv(envVar))
- } else if (sc.getConf.contains(sysProp)) {
- arrayBuf += (optionName, sc.getConf.get(sysProp))
- }
- }
+ private var client: Client = null
+ private var appId: ApplicationId = null
+ private var stopping: Boolean = false
+ private var totalExpectedExecutors = 0
+ /**
+ * Create a Yarn client to submit an application to the ResourceManager.
+ * This waits until the application is running.
+ */
override def start() {
super.start()
-
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIHostPort) }
val argsArrayBuf = new ArrayBuffer[String]()
- argsArrayBuf += (
- "--args", hostport
- )
-
- // process any optional arguments, given either as environment variables
- // or system properties. use the defaults already defined in ClientArguments
- // if things aren't specified. system properties override environment
- // variables.
- List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
- ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
- ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
- ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
- ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
- ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
- ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
- ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
- ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
- ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
- .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
-
- logDebug("ClientArguments called with: " + argsArrayBuf)
+ argsArrayBuf += ("--arg", hostport)
+ argsArrayBuf ++= getExtraClientArguments
+
+ logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
- appId = client.runApp()
- waitForApp()
- checkerThread = yarnApplicationStateCheckerThread()
+ appId = client.submitApplication()
+ waitForApplication()
+ asyncMonitorApplication()
}
- def waitForApp() {
-
- // TODO : need a better way to find out whether the executors are ready or not
- // maybe by resource usage report?
- while(true) {
- val report = client.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+ /**
+ * Return any extra command line arguments to be passed to Client provided in the form of
+ * environment variables or Spark properties.
+ */
+ private def getExtraClientArguments: Seq[String] = {
+ val extraArgs = new ArrayBuffer[String]
+ val optionTuples = // List of (target Client argument, environment variable, Spark property)
+ List(
+ ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+ ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+ ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
+ ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+ ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+ ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+ ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+ ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
)
-
- // Ready to go, or already gone.
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.RUNNING) {
- return
- } else if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application already ended," +
- "might be killed or not able to launch application master.")
+ optionTuples.foreach { case (optionName, envVar, sparkProp) =>
+ if (System.getenv(envVar) != null) {
+ extraArgs += (optionName, System.getenv(envVar))
+ } else if (sc.getConf.contains(sparkProp)) {
+ extraArgs += (optionName, sc.getConf.get(sparkProp))
}
+ }
+ extraArgs
+ }
- Thread.sleep(1000)
+ /**
+ * Report the state of the application until it is running.
+ * If the application has finished, failed or been killed in the process, throw an exception.
+ * This assumes both `client` and `appId` have already been set.
+ */
+ private def waitForApplication(): Unit = {
+ assert(client != null && appId != null, "Application has not been submitted yet!")
+ val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ throw new SparkException("Yarn application has already ended! " +
+ "It might have been killed or unable to launch application master.")
+ }
+ if (state == YarnApplicationState.RUNNING) {
+ logInfo(s"Application $appId has started running.")
}
}
- private def yarnApplicationStateCheckerThread(): Thread = {
+ /**
+ * Monitor the application state in a separate thread.
+ * If the application has exited for any reason, stop the SparkContext.
+ * This assumes both `client` and `appId` have already been set.
+ */
+ private def asyncMonitorApplication(): Unit = {
+ assert(client != null && appId != null, "Application has not been submitted yet!")
val t = new Thread {
override def run() {
while (!stopping) {
val report = client.getApplicationReport(appId)
val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
- || state == YarnApplicationState.FAILED) {
- logError(s"Yarn application already ended: $state")
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.KILLED ||
+ state == YarnApplicationState.FAILED) {
+ logError(s"Yarn application has already exited with state $state!")
sc.stop()
stopping = true
}
Thread.sleep(1000L)
}
- checkerThread = null
Thread.currentThread().interrupt()
}
}
- t.setName("Yarn Application State Checker")
+ t.setName("Yarn application state monitor")
t.setDaemon(true)
t.start()
- t
}
+ /**
+ * Stop the scheduler. This assumes `start()` has already been called.
+ */
override def stop() {
+ assert(client != null, "Attempted to stop this scheduler before starting it!")
stopping = true
super.stop()
- client.stop
+ client.stop()
logInfo("Stopped")
}
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index c3b7a2c8f0..9bd916100d 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -90,7 +90,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- ClientBase.populateClasspath(args, conf, sparkConf, env, None)
+ ClientBase.populateClasspath(args, conf, sparkConf, env)
val cp = env("CLASSPATH").split(File.pathSeparator)
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -114,10 +114,10 @@ class ClientBaseSuite extends FunSuite with Matchers {
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
- doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]),
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort(), anyBoolean())
- var tempDir = Files.createTempDir();
+ val tempDir = Files.createTempDir()
try {
client.prepareLocalResources(tempDir.getAbsolutePath())
sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
@@ -247,13 +247,13 @@ class ClientBaseSuite extends FunSuite with Matchers {
private class DummyClient(
val args: ClientArguments,
- val conf: Configuration,
+ val hadoopConf: Configuration,
val sparkConf: SparkConf,
val yarnConf: YarnConfiguration) extends ClientBase {
-
- override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit =
- throw new UnsupportedOperationException()
-
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ???
+ override def submitApplication(): ApplicationId = ???
+ override def getApplicationReport(appId: ApplicationId): ApplicationReport = ???
+ override def getClientToken(report: ApplicationReport): String = ???
}
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 82e45e3e7a..0b43e6ee20 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
@@ -34,128 +32,98 @@ import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
*/
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
+private[spark] class Client(
+ val args: ClientArguments,
+ val hadoopConf: Configuration,
+ val sparkConf: SparkConf)
extends ClientBase with Logging {
- val yarnClient = YarnClient.createYarnClient
-
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
- val args = clientArgs
- val conf = hadoopConf
- val sparkConf = spConf
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- def runApp(): ApplicationId = {
- validateArgs()
- // Initialize and start the client service.
+ val yarnClient = YarnClient.createYarnClient
+ val yarnConf = new YarnConfiguration(hadoopConf)
+
+ def stop(): Unit = yarnClient.stop()
+
+ /* ------------------------------------------------------------------------------------- *
+ | The following methods have much in common in the stable and alpha versions of Client, |
+ | but cannot be implemented in the parent trait due to subtle API differences across |
+ | hadoop versions. |
+ * ------------------------------------------------------------------------------------- */
+
+ /**
+ * Submit an application running our ApplicationMaster to the ResourceManager.
+ *
+ * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
+ * creating applications and setting up the application submission context. This was not
+ * available in the alpha API.
+ */
+ override def submitApplication(): ApplicationId = {
yarnClient.init(yarnConf)
yarnClient.start()
- // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
- logClusterResourceDetails()
-
- // Prepare to submit a request to the ResourcManager (specifically its ApplicationsManager (ASM)
- // interface).
+ logInfo("Requesting a new application from cluster with %d NodeManagers"
+ .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
- // Get a new client application.
+ // Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
+ // Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
- // Set up resource and environment variables.
- val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val launchEnv = setupLaunchEnv(localResources, appStagingDir)
- val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)
+ // Set up the appropriate contexts to launch our AM
+ val containerContext = createContainerLaunchContext(newAppResponse)
+ val appContext = createApplicationSubmissionContext(newApp, containerContext)
- // Set up an application submission context.
- val appContext = newApp.getApplicationSubmissionContext()
- appContext.setApplicationName(args.appName)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setApplicationType("SPARK")
-
- // Memory for the ApplicationMaster.
- val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- memoryResource.setMemory(args.amMemory + memoryOverhead)
- appContext.setResource(memoryResource)
-
- // Finally, submit and monitor the application.
- submitApp(appContext)
+ // Finally, submit and monitor the application
+ logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+ yarnClient.submitApplication(appContext)
appId
}
- def run() {
- val appId = runApp()
- monitorApplication(appId)
- }
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
- logInfo("Got cluster metric info from ResourceManager, number of NodeManagers: " +
- clusterMetrics.getNumNodeManagers)
+ /**
+ * Set up the context for submitting our ApplicationMaster.
+ * This uses the YarnClientApplication not available in the Yarn alpha API.
+ */
+ def createApplicationSubmissionContext(
+ newApp: YarnClientApplication,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
+ val appContext = newApp.getApplicationSubmissionContext
+ appContext.setApplicationName(args.appName)
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(containerContext)
+ appContext.setApplicationType("SPARK")
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemory(args.amMemory + amMemoryOverhead)
+ appContext.setResource(capability)
+ appContext
}
- def setupSecurityToken(amContainer: ContainerLaunchContext) = {
- // Setup security tokens.
- val dob = new DataOutputBuffer()
+ /** Set up security tokens for launching our ApplicationMaster container. */
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
+ val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
- amContainer.setTokens(ByteBuffer.wrap(dob.getData()))
+ amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager.
- logInfo("Submitting application to ResourceManager")
- yarnClient.submitApplication(appContext)
- }
+ /** Get the application report from the ResourceManager for an application we have submitted. */
+ override def getApplicationReport(appId: ApplicationId): ApplicationReport =
+ yarnClient.getApplicationReport(appId)
- def getApplicationReport(appId: ApplicationId) =
- yarnClient.getApplicationReport(appId)
-
- def stop = yarnClient.stop
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
- while (true) {
- Thread.sleep(interval)
- val report = yarnClient.getApplicationReport(appId)
-
- logInfo("Application report from ResourceManager: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToAMToken: " + report.getClientToAMToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- true
- }
+ /**
+ * Return the security token used by this client to communicate with the ApplicationMaster.
+ * If no security is enabled, the token returned by the report is null.
+ */
+ override def getClientToken(report: ApplicationReport): String =
+ Option(report.getClientToAMToken).map(_.toString).getOrElse("")
}
object Client {
-
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
@@ -163,22 +131,19 @@ object Client {
}
// Set an env variable indicating we are running in YARN mode.
- // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
- // see Client#setupLaunchEnv().
+ // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
- val sparkConf = new SparkConf()
+ val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
- case e: Exception => {
+ case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
- }
}
System.exit(0)
}
-
}