aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala26
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala14
4 files changed, 44 insertions, 26 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8226207de4..4ccddc214c 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def run() {
val appId = runApp()
monitorApplication(appId)
- System.exit(0)
}
def logClusterResourceDetails() {
@@ -179,8 +178,17 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run
+ try {
+ val args = new ClientArguments(argStrings, sparkConf)
+ new Client(args, sparkConf).run()
+ } catch {
+ case e: Exception => {
+ Console.err.println(e.getMessage)
+ System.exit(1)
+ }
+ }
+
+ System.exit(0)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b2c413b6d2..fd3ef9e1fa 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
case Nil =>
if (userClass == null) {
- printUsageAndExit(1)
+ throw new IllegalArgumentException(getUsageMessage())
}
case _ =>
- printUsageAndExit(1, args)
+ throw new IllegalArgumentException(getUsageMessage(args))
}
}
@@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
}
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
+ def getUsageMessage(unknownParam: Any = null): String = {
+ val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
+
+ message +
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
@@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job."
- )
- System.exit(exitCode)
}
-
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 29a35680c0..6861b50300 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext}
/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
@@ -79,7 +79,7 @@ trait ClientBase extends Logging {
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
- args.printUsageAndExit(1)
+ throw new IllegalArgumentException(args.getUsageMessage())
}
}
}
@@ -94,15 +94,20 @@ trait ClientBase extends Logging {
// If we have requested more then the clusters max for a single resource then exit.
if (args.executorMemory > maxMem) {
- logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
- format(args.executorMemory, maxMem))
- System.exit(1)
+ val errorMessage =
+ "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
+ .format(args.executorMemory, maxMem)
+
+ logError(errorMessage)
+ throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) {
- logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
- format(args.amMemory, maxMem))
- System.exit(1)
+
+ val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
+ .format(args.amMemory, maxMem)
+ logError(errorMessage)
+ throw new IllegalArgumentException(errorMessage)
}
// We could add checks to make sure the entire cluster has enough resources but that involves
@@ -186,8 +191,9 @@ trait ClientBase extends Logging {
val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- logError("Can't get Master Kerberos principal for use as renewer")
- System.exit(1)
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 24027618c1..80a8bceb17 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def run() {
val appId = runApp()
monitorApplication(appId)
- System.exit(0)
}
def logClusterResourceDetails() {
@@ -186,9 +185,18 @@ object Client {
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
+ try {
+ val args = new ClientArguments(argStrings, sparkConf)
+ new Client(args, sparkConf).run()
+ } catch {
+ case e: Exception => {
+ Console.err.println(e.getMessage)
+ System.exit(1)
+ }
+ }
+
+ System.exit(0)
}
}