aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-09-23 11:20:52 -0500
committerThomas Graves <tgraves@apache.org>2014-09-23 11:20:52 -0500
commitc4022dd52b4827323ff956632dc7623f546da937 (patch)
treeb0f12fc18540d86e958700440e1f454fd433863d /yarn/alpha
parent14f8c340402366cb998c563b3f7d9ff7d9940271 (diff)
downloadspark-c4022dd52b4827323ff956632dc7623f546da937.tar.gz
spark-c4022dd52b4827323ff956632dc7623f546da937.tar.bz2
spark-c4022dd52b4827323ff956632dc7623f546da937.zip
[SPARK-3477] Clean up code in Yarn Client / ClientBase
This is part of a broader effort to clean up the Yarn integration code after #2020. The high-level changes in this PR include: - Removing duplicate code, especially across the alpha and stable APIs - Simplify unnecessarily complex method signatures and hierarchies - Rename unclear variable and method names - Organize logging output produced when the user runs Spark on Yarn - Extensively add documentation - Privatize classes where possible I have tested the stable API on a Hadoop 2.4 cluster. I tested submitting a jar that references classes in other jars in both client and cluster mode. I also made changes in the alpha API, though I do not have access to an alpha cluster. I have verified that it compiles, but it would be ideal if others can help test it. For those interested in some examples in detail, please read on. -------------------------------------------------------------------------------------------------------- ***Appendix*** - The loop to `getApplicationReport` from the RM is duplicated in 4 places: in the stable `Client`, alpha `Client`, and twice in `YarnClientSchedulerBackend`. We should not have different loops for client and cluster deploy modes. - There are many fragmented small helper methods that are only used once and should just be inlined. For instance, `ClientBase#getLocalPath` returns `null` on certain conditions, and its only caller `ClientBase#addFileToClasspath` checks whether the value returned is `null`. We could just have the caller check on that same condition to avoid passing `null`s around. - In `YarnSparkHadoopUtil#addToEnvironment`, we take in an argument `classpathSeparator` that always has the same value upstream (i.e. `File.pathSeparator`). This argument is now removed from the signature and all callers of this method upstream. - `ClientBase#copyRemoteFile` is now renamed to `copyFileToRemote`. It was unclear whether we are copying a remote file to our local file system, or copying a locally visible file to a remote file system. Also, even the content of the method has inaccurately named variables. We use `val remoteFs` to signify the file system of the locally visible file and `val fs` to signify the remote, destination file system. These are now renamed `srcFs` and `destFs` respectively. - We currently log the AM container's environment and resource mappings directly as Scala collections. This is incredibly hard to read and probably too verbose for the average Spark user. In other modes (e.g. standalone), we also don't log the launch commands by default, so the logging level of these information is now set to `DEBUG`. - None of these classes (`Client`, `ClientBase`, `YarnSparkHadoopUtil` etc.) is intended to be used by a Spark application (the user should go through Spark submit instead). At the very least they should be `private[spark]`. Author: Andrew Or <andrewor14@gmail.com> Closes #2350 from andrewor14/yarn-cleanup and squashes the following commits: 39e8c7b [Andrew Or] Address review comments 6619f9b [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup 2ca6d64 [Andrew Or] Improve logging in application monitor a3b9693 [Andrew Or] Minor changes 7dd6298 [Andrew Or] Simplify ClientBase#monitorApplication 547487c [Andrew Or] Provide default values for null application report entries a0ad1e9 [Andrew Or] Fix class not found error 1590141 [Andrew Or] Address review comments 45ccdea [Andrew Or] Remove usages of getAMMemory d8e33b6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup ed0b42d [Andrew Or] Fix alpha compilation error c0587b4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup 6d74888 [Andrew Or] Minor comment changes 6573c1d [Andrew Or] Clean up, simplify and document code for setting classpaths e4779b6 [Andrew Or] Clean up log messages + variable naming in ClientBase 8766d37 [Andrew Or] Heavily add documentation to Client* classes + various clean-ups 6c94d79 [Andrew Or] Various cleanups in ClientBase and ClientArguments ef7069a [Andrew Or] Clean up YarnClientSchedulerBackend more 6de9072 [Andrew Or] Guard against potential NPE in debug logging mode fabe4c4 [Andrew Or] Reuse more code in YarnClientSchedulerBackend 3f941dc [Andrew Or] First cut at simplifying the Client (stable and alpha)
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala145
1 files changed, 59 insertions, 86 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index aff9ab71f0..5a20532315 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -23,13 +23,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +35,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
*/
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
+private[spark] class Client(
+ val args: ClientArguments,
+ val hadoopConf: Configuration,
+ val sparkConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf) =
@@ -45,112 +46,86 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
- val args = clientArgs
- val conf = hadoopConf
- val sparkConf = spConf
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
+ /* ------------------------------------------------------------------------------------- *
+ | The following methods have much in common in the stable and alpha versions of Client, |
+ | but cannot be implemented in the parent trait due to subtle API differences across |
+ | hadoop versions. |
+ * ------------------------------------------------------------------------------------- */
- // for client user who want to monitor app status by itself.
- def runApp() = {
- validateArgs()
-
+ /** Submit an application running our ApplicationMaster to the ResourceManager. */
+ override def submitApplication(): ApplicationId = {
init(yarnConf)
start()
- logClusterResourceDetails()
- val newApp = super.getNewApplication()
- val appId = newApp.getApplicationId()
+ logInfo("Requesting a new application from cluster with %d NodeManagers"
+ .format(getYarnClusterMetrics.getNumNodeManagers))
- verifyClusterResources(newApp)
- val appContext = createApplicationSubmissionContext(appId)
- val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val env = setupLaunchEnv(localResources, appStagingDir)
- val amContainer = createContainerLaunchContext(newApp, localResources, env)
+ // Get a new application from our RM
+ val newAppResponse = getNewApplication()
+ val appId = newAppResponse.getApplicationId()
- val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- // Memory for the ApplicationMaster.
- capability.setMemory(args.amMemory + memoryOverhead)
- amContainer.setResource(capability)
+ // Verify whether the cluster has enough resources for our AM
+ verifyClusterResources(newAppResponse)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ // Set up the appropriate contexts to launch our AM
+ val containerContext = createContainerLaunchContext(newAppResponse)
+ val appContext = createApplicationSubmissionContext(appId, containerContext)
- submitApp(appContext)
+ // Finally, submit and monitor the application
+ logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+ submitApplication(appContext)
appId
}
- def run() {
- val appId = runApp()
- monitorApplication(appId)
- }
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got cluster metric info from ASM, numNodeManagers = " +
- clusterMetrics.getNumNodeManagers)
+ /**
+ * Set up a context for launching our ApplicationMaster container.
+ * In the Yarn alpha API, the memory requirements of this container must be set in
+ * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
+ */
+ override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+ : ContainerLaunchContext = {
+ val containerContext = super.createContainerLaunchContext(newAppResponse)
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemory(args.amMemory + amMemoryOverhead)
+ containerContext.setResource(capability)
+ containerContext
}
-
- def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
- logInfo("Setting up application submission context for ASM")
+ /** Set up the context for submitting our ApplicationMaster. */
+ def createApplicationSubmissionContext(
+ appId: ApplicationId,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(containerContext)
+ appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
appContext
}
- def setupSecurityToken(amContainer: ContainerLaunchContext) = {
- // Setup security tokens.
+ /**
+ * Set up security tokens for launching our ApplicationMaster container.
+ * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
+ */
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
}
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager.
- logInfo("Submitting application to ASM")
- super.submitApplication(appContext)
- }
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
- while (true) {
- Thread.sleep(interval)
- val report = super.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToken: " + report.getClientToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- true
- }
+ /**
+ * Return the security token used by this client to communicate with the ApplicationMaster.
+ * If no security is enabled, the token returned by the report is null.
+ * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
+ */
+ override def getClientToken(report: ApplicationReport): String =
+ Option(report.getClientToken).getOrElse("")
}
object Client {
-
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
@@ -158,19 +133,17 @@ object Client {
}
// Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
-
val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
- case e: Exception => {
+ case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
- }
}
System.exit(0)