aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala9
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala2
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java7
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java29
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java11
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
7 files changed, 38 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 755c4b6ec1..9075e3eb3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -43,6 +43,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark.{SPARK_VERSION, SparkException, SparkUserAppException}
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
+import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -75,10 +76,6 @@ object SparkSubmit {
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
- // A special jar name that indicates the class being run is inside of Spark itself, and therefore
- // no user jar is needed.
- private val SPARK_INTERNAL = "spark-internal"
-
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
@@ -575,7 +572,7 @@ object SparkSubmit {
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
- if (args.primaryResource != SPARK_INTERNAL) {
+ if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
@@ -795,7 +792,7 @@ object SparkSubmit {
}
private[deploy] def isInternal(res: String): Boolean = {
- res == SPARK_INTERNAL
+ res == SparkLauncher.NO_RESOURCE
}
/**
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 3e47bfc274..8ca54b24d8 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -94,7 +94,7 @@ public class SparkLauncherSuite {
SparkLauncher launcher = new SparkLauncher(env)
.setSparkHome(System.getProperty("spark.test.home"))
.setMaster("local")
- .setAppResource("spark-internal")
+ .setAppResource(SparkLauncher.NO_RESOURCE)
.addSparkArg(opts.CONF,
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
index 713560d3dd..cac15a1dc4 100644
--- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -48,7 +48,7 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
.setConf("spark.ui.enabled", "false")
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
.setMaster(master)
- .setAppResource("spark-internal")
+ .setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(TestApp.getClass.getName().stripSuffix("$"))
.startApplication()
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index a083f05a2a..08873f5811 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -67,6 +67,13 @@ public class SparkLauncher {
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
/**
+ * A special value for the resource that tells Spark to not try to process the app resource as a
+ * file. This is useful when the class being executed is added to the application using other
+ * means - for example, by adding jars using the package download feature.
+ */
+ public static final String NO_RESOURCE = "spark-internal";
+
+ /**
* Maximum time (in ms) to wait for a child process to connect back to the launcher server
* when using @link{#start()}.
*/
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 6941ca903c..76897c4f75 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -83,9 +83,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
static {
specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
- "spark-internal");
+ SparkLauncher.NO_RESOURCE);
specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
- "spark-internal");
+ SparkLauncher.NO_RESOURCE);
}
final List<String> sparkArgs;
@@ -112,11 +112,11 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
List<String> submitArgs = args;
if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
this.allowsMixedArguments = true;
- appResource = PYSPARK_SHELL_RESOURCE;
+ appResource = PYSPARK_SHELL;
submitArgs = args.subList(1, args.size());
} else if (args.size() > 0 && args.get(0).equals(SPARKR_SHELL)) {
this.allowsMixedArguments = true;
- appResource = SPARKR_SHELL_RESOURCE;
+ appResource = SPARKR_SHELL;
submitArgs = args.subList(1, args.size());
} else if (args.size() > 0 && args.get(0).equals(RUN_EXAMPLE)) {
isExample = true;
@@ -134,9 +134,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
@Override
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
- if (PYSPARK_SHELL_RESOURCE.equals(appResource) && !printInfo) {
+ if (PYSPARK_SHELL.equals(appResource) && !printInfo) {
return buildPySparkShellCommand(env);
- } else if (SPARKR_SHELL_RESOURCE.equals(appResource) && !printInfo) {
+ } else if (SPARKR_SHELL.equals(appResource) && !printInfo) {
return buildSparkRCommand(env);
} else {
return buildSparkSubmitCommand(env);
@@ -147,6 +147,10 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
List<String> args = new ArrayList<>();
SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+ if (!allowsMixedArguments) {
+ checkArgument(appResource != null, "Missing application resource.");
+ }
+
if (verbose) {
args.add(parser.VERBOSE);
}
@@ -278,6 +282,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
// When launching the pyspark shell, the spark-submit arguments should be stored in the
// PYSPARK_SUBMIT_ARGS env variable.
+ appResource = PYSPARK_SHELL_RESOURCE;
constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
// The executable is the PYSPARK_DRIVER_PYTHON env variable set by the pyspark script,
@@ -301,6 +306,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
}
// When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS
// env variable.
+ appResource = SPARKR_SHELL_RESOURCE;
constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");
// Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up.
@@ -435,22 +441,19 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
className = EXAMPLE_CLASS_PREFIX + className;
}
mainClass = className;
- appResource = "spark-internal";
+ appResource = SparkLauncher.NO_RESOURCE;
return false;
} else {
checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
- sparkArgs.add(opt);
+ checkState(appResource == null, "Found unrecognized argument but resource is already set.");
+ appResource = opt;
return false;
}
}
@Override
protected void handleExtraArgs(List<String> extra) {
- if (isExample) {
- appArgs.addAll(extra);
- } else {
- sparkArgs.addAll(extra);
- }
+ appArgs.addAll(extra);
}
}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index c7e8b2e03a..c16f46a360 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -72,7 +72,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
parser.CONF,
"spark.randomOption=foo",
parser.CONF,
- SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath");
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath",
+ SparkLauncher.NO_RESOURCE);
Map<String, String> env = new HashMap<>();
List<String> cmd = buildCommand(sparkSubmitArgs, env);
@@ -109,7 +110,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
List<String> sparkSubmitArgs = Arrays.asList(
parser.CLASS + "=org.my.Class",
parser.MASTER + "=foo",
- parser.DEPLOY_MODE + "=bar");
+ parser.DEPLOY_MODE + "=bar",
+ SparkLauncher.NO_RESOURCE);
List<String> cmd = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
@@ -168,6 +170,11 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
assertEquals("42", cmd.get(cmd.size() - 1));
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testMissingAppResource() {
+ new SparkSubmitCommandBuilder().buildSparkSubmitArgs();
+ }
+
private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) throws Exception {
String deployMode = isDriver ? "client" : "cluster";
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index b2b4d84f53..7df11ca760 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -147,7 +147,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
.setPropertiesFile(propsFile)
.setMaster("yarn")
.setDeployMode("client")
- .setAppResource("spark-internal")
+ .setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(mainClassName(YarnLauncherTestApp.getClass))
.startApplication()