aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-09-23 11:20:52 -0500
committerThomas Graves <tgraves@apache.org>2014-09-23 11:20:52 -0500
commitc4022dd52b4827323ff956632dc7623f546da937 (patch)
treeb0f12fc18540d86e958700440e1f454fd433863d /yarn/stable
parent14f8c340402366cb998c563b3f7d9ff7d9940271 (diff)
downloadspark-c4022dd52b4827323ff956632dc7623f546da937.tar.gz
spark-c4022dd52b4827323ff956632dc7623f546da937.tar.bz2
spark-c4022dd52b4827323ff956632dc7623f546da937.zip
[SPARK-3477] Clean up code in Yarn Client / ClientBase
This is part of a broader effort to clean up the Yarn integration code after #2020. The high-level changes in this PR include: - Removing duplicate code, especially across the alpha and stable APIs - Simplify unnecessarily complex method signatures and hierarchies - Rename unclear variable and method names - Organize logging output produced when the user runs Spark on Yarn - Extensively add documentation - Privatize classes where possible I have tested the stable API on a Hadoop 2.4 cluster. I tested submitting a jar that references classes in other jars in both client and cluster mode. I also made changes in the alpha API, though I do not have access to an alpha cluster. I have verified that it compiles, but it would be ideal if others can help test it. For those interested in some examples in detail, please read on. -------------------------------------------------------------------------------------------------------- ***Appendix*** - The loop to `getApplicationReport` from the RM is duplicated in 4 places: in the stable `Client`, alpha `Client`, and twice in `YarnClientSchedulerBackend`. We should not have different loops for client and cluster deploy modes. - There are many fragmented small helper methods that are only used once and should just be inlined. For instance, `ClientBase#getLocalPath` returns `null` on certain conditions, and its only caller `ClientBase#addFileToClasspath` checks whether the value returned is `null`. We could just have the caller check on that same condition to avoid passing `null`s around. - In `YarnSparkHadoopUtil#addToEnvironment`, we take in an argument `classpathSeparator` that always has the same value upstream (i.e. `File.pathSeparator`). This argument is now removed from the signature and all callers of this method upstream. - `ClientBase#copyRemoteFile` is now renamed to `copyFileToRemote`. It was unclear whether we are copying a remote file to our local file system, or copying a locally visible file to a remote file system. Also, even the content of the method has inaccurately named variables. We use `val remoteFs` to signify the file system of the locally visible file and `val fs` to signify the remote, destination file system. These are now renamed `srcFs` and `destFs` respectively. - We currently log the AM container's environment and resource mappings directly as Scala collections. This is incredibly hard to read and probably too verbose for the average Spark user. In other modes (e.g. standalone), we also don't log the launch commands by default, so the logging level of these information is now set to `DEBUG`. - None of these classes (`Client`, `ClientBase`, `YarnSparkHadoopUtil` etc.) is intended to be used by a Spark application (the user should go through Spark submit instead). At the very least they should be `private[spark]`. Author: Andrew Or <andrewor14@gmail.com> Closes #2350 from andrewor14/yarn-cleanup and squashes the following commits: 39e8c7b [Andrew Or] Address review comments 6619f9b [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup 2ca6d64 [Andrew Or] Improve logging in application monitor a3b9693 [Andrew Or] Minor changes 7dd6298 [Andrew Or] Simplify ClientBase#monitorApplication 547487c [Andrew Or] Provide default values for null application report entries a0ad1e9 [Andrew Or] Fix class not found error 1590141 [Andrew Or] Address review comments 45ccdea [Andrew Or] Remove usages of getAMMemory d8e33b6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup ed0b42d [Andrew Or] Fix alpha compilation error c0587b4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup 6d74888 [Andrew Or] Minor comment changes 6573c1d [Andrew Or] Clean up, simplify and document code for setting classpaths e4779b6 [Andrew Or] Clean up log messages + variable naming in ClientBase 8766d37 [Andrew Or] Heavily add documentation to Client* classes + various clean-ups 6c94d79 [Andrew Or] Various cleanups in ClientBase and ClientArguments ef7069a [Andrew Or] Clean up YarnClientSchedulerBackend more 6de9072 [Andrew Or] Guard against potential NPE in debug logging mode fabe4c4 [Andrew Or] Reuse more code in YarnClientSchedulerBackend 3f941dc [Andrew Or] First cut at simplifying the Client (stable and alpha)
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala167
1 files changed, 66 insertions, 101 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 82e45e3e7a..0b43e6ee20 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
@@ -34,128 +32,98 @@ import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
*/
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
+private[spark] class Client(
+ val args: ClientArguments,
+ val hadoopConf: Configuration,
+ val sparkConf: SparkConf)
extends ClientBase with Logging {
- val yarnClient = YarnClient.createYarnClient
-
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
- val args = clientArgs
- val conf = hadoopConf
- val sparkConf = spConf
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- def runApp(): ApplicationId = {
- validateArgs()
- // Initialize and start the client service.
+ val yarnClient = YarnClient.createYarnClient
+ val yarnConf = new YarnConfiguration(hadoopConf)
+
+ def stop(): Unit = yarnClient.stop()
+
+ /* ------------------------------------------------------------------------------------- *
+ | The following methods have much in common in the stable and alpha versions of Client, |
+ | but cannot be implemented in the parent trait due to subtle API differences across |
+ | hadoop versions. |
+ * ------------------------------------------------------------------------------------- */
+
+ /**
+ * Submit an application running our ApplicationMaster to the ResourceManager.
+ *
+ * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
+ * creating applications and setting up the application submission context. This was not
+ * available in the alpha API.
+ */
+ override def submitApplication(): ApplicationId = {
yarnClient.init(yarnConf)
yarnClient.start()
- // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
- logClusterResourceDetails()
-
- // Prepare to submit a request to the ResourcManager (specifically its ApplicationsManager (ASM)
- // interface).
+ logInfo("Requesting a new application from cluster with %d NodeManagers"
+ .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
- // Get a new client application.
+ // Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
+ // Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
- // Set up resource and environment variables.
- val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val launchEnv = setupLaunchEnv(localResources, appStagingDir)
- val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)
+ // Set up the appropriate contexts to launch our AM
+ val containerContext = createContainerLaunchContext(newAppResponse)
+ val appContext = createApplicationSubmissionContext(newApp, containerContext)
- // Set up an application submission context.
- val appContext = newApp.getApplicationSubmissionContext()
- appContext.setApplicationName(args.appName)
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setApplicationType("SPARK")
-
- // Memory for the ApplicationMaster.
- val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- memoryResource.setMemory(args.amMemory + memoryOverhead)
- appContext.setResource(memoryResource)
-
- // Finally, submit and monitor the application.
- submitApp(appContext)
+ // Finally, submit and monitor the application
+ logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+ yarnClient.submitApplication(appContext)
appId
}
- def run() {
- val appId = runApp()
- monitorApplication(appId)
- }
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
- logInfo("Got cluster metric info from ResourceManager, number of NodeManagers: " +
- clusterMetrics.getNumNodeManagers)
+ /**
+ * Set up the context for submitting our ApplicationMaster.
+ * This uses the YarnClientApplication not available in the Yarn alpha API.
+ */
+ def createApplicationSubmissionContext(
+ newApp: YarnClientApplication,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
+ val appContext = newApp.getApplicationSubmissionContext
+ appContext.setApplicationName(args.appName)
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(containerContext)
+ appContext.setApplicationType("SPARK")
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemory(args.amMemory + amMemoryOverhead)
+ appContext.setResource(capability)
+ appContext
}
- def setupSecurityToken(amContainer: ContainerLaunchContext) = {
- // Setup security tokens.
- val dob = new DataOutputBuffer()
+ /** Set up security tokens for launching our ApplicationMaster container. */
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
+ val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
- amContainer.setTokens(ByteBuffer.wrap(dob.getData()))
+ amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager.
- logInfo("Submitting application to ResourceManager")
- yarnClient.submitApplication(appContext)
- }
+ /** Get the application report from the ResourceManager for an application we have submitted. */
+ override def getApplicationReport(appId: ApplicationId): ApplicationReport =
+ yarnClient.getApplicationReport(appId)
- def getApplicationReport(appId: ApplicationId) =
- yarnClient.getApplicationReport(appId)
-
- def stop = yarnClient.stop
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
- while (true) {
- Thread.sleep(interval)
- val report = yarnClient.getApplicationReport(appId)
-
- logInfo("Application report from ResourceManager: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToAMToken: " + report.getClientToAMToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- true
- }
+ /**
+ * Return the security token used by this client to communicate with the ApplicationMaster.
+ * If no security is enabled, the token returned by the report is null.
+ */
+ override def getClientToken(report: ApplicationReport): String =
+ Option(report.getClientToAMToken).map(_.toString).getOrElse("")
}
object Client {
-
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
@@ -163,22 +131,19 @@ object Client {
}
// Set an env variable indicating we are running in YARN mode.
- // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
- // see Client#setupLaunchEnv().
+ // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
- val sparkConf = new SparkConf()
+ val sparkConf = new SparkConf
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
- case e: Exception => {
+ case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
- }
}
System.exit(0)
}
-
}