aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala106
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala38
3 files changed, 158 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index f97bf67fa5..0379adeb07 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,8 +158,9 @@ object SparkSubmit {
args.files = mergeFileLists(args.files, args.primaryResource)
}
args.files = mergeFileLists(args.files, args.pyFiles)
- // Format python file paths properly before adding them to the PYTHONPATH
- sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
+ if (args.pyFiles != null) {
+ sysProps("spark.submit.pyFiles") = args.pyFiles
+ }
}
// Special flag to avoid deprecation warnings at the client
@@ -284,6 +285,29 @@ object SparkSubmit {
sysProps.getOrElseUpdate(k, v)
}
+ // Resolve paths in certain spark properties
+ val pathConfigs = Seq(
+ "spark.jars",
+ "spark.files",
+ "spark.yarn.jar",
+ "spark.yarn.dist.files",
+ "spark.yarn.dist.archives")
+ pathConfigs.foreach { config =>
+ // Replace old URIs with resolved URIs, if they exist
+ sysProps.get(config).foreach { oldValue =>
+ sysProps(config) = Utils.resolveURIs(oldValue)
+ }
+ }
+
+ // Resolve and format python file paths properly before adding them to the PYTHONPATH.
+ // The resolving part is redundant in the case of --py-files, but necessary if the user
+ // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
+ sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
+ val resolvedPyFiles = Utils.resolveURIs(pyFiles)
+ val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+ sysProps("spark.submit.pyFiles") = formattedPyFiles
+ }
+
(childArgs, childClasspath, sysProps, childMainClass)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1cdf50d5c0..d8cd0ff2c9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}
- test("spark submit includes jars passed in through --jar") {
+ test("includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
@@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}
+ test("resolves command line argument paths correctly") {
+ val jars = "/jar1,/jar2" // --jars
+ val files = "hdfs:/file1,file2" // --files
+ val archives = "file:/archive1,archive2" // --archives
+ val pyFiles = "py-file1,py-file2" // --py-files
+
+ // Test jars and files
+ val clArgs = Seq(
+ "--master", "local",
+ "--class", "org.SomeClass",
+ "--jars", jars,
+ "--files", files,
+ "thejar.jar")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+ appArgs.jars should be (Utils.resolveURIs(jars))
+ appArgs.files should be (Utils.resolveURIs(files))
+ sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
+ sysProps("spark.files") should be (Utils.resolveURIs(files))
+
+ // Test files and archives (Yarn)
+ val clArgs2 = Seq(
+ "--master", "yarn-client",
+ "--class", "org.SomeClass",
+ "--files", files,
+ "--archives", archives,
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+ appArgs2.files should be (Utils.resolveURIs(files))
+ appArgs2.archives should be (Utils.resolveURIs(archives))
+ sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
+ sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
+
+ // Test python files
+ val clArgs3 = Seq(
+ "--master", "local",
+ "--py-files", pyFiles,
+ "mister.py"
+ )
+ val appArgs3 = new SparkSubmitArguments(clArgs3)
+ val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+ appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
+ sysProps3("spark.submit.pyFiles") should be (
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ }
+
+ test("resolves config paths correctly") {
+ val jars = "/jar1,/jar2" // spark.jars
+ val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
+ val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
+ val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
+
+ // Test jars and files
+ val f1 = File.createTempFile("test-submit-jars-files", "")
+ val writer1 = new PrintWriter(f1)
+ writer1.println("spark.jars " + jars)
+ writer1.println("spark.files " + files)
+ writer1.close()
+ val clArgs = Seq(
+ "--master", "local",
+ "--class", "org.SomeClass",
+ "--properties-file", f1.getPath,
+ "thejar.jar"
+ )
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
+ sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+ sysProps("spark.files") should be(Utils.resolveURIs(files))
+
+ // Test files and archives (Yarn)
+ val f2 = File.createTempFile("test-submit-files-archives", "")
+ val writer2 = new PrintWriter(f2)
+ writer2.println("spark.yarn.dist.files " + files)
+ writer2.println("spark.yarn.dist.archives " + archives)
+ writer2.close()
+ val clArgs2 = Seq(
+ "--master", "yarn-client",
+ "--class", "org.SomeClass",
+ "--properties-file", f2.getPath,
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
+ sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+ sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
+
+ // Test python files
+ val f3 = File.createTempFile("test-submit-python-files", "")
+ val writer3 = new PrintWriter(f3)
+ writer3.println("spark.submit.pyFiles " + pyFiles)
+ writer3.close()
+ val clArgs3 = Seq(
+ "--master", "local",
+ "--properties-file", f3.getPath,
+ "mister.py"
+ )
+ val appArgs3 = new SparkSubmitArguments(clArgs3)
+ val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
+ sysProps3("spark.submit.pyFiles") should be(
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ }
+
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1c112334cc..8ffe3e2b13 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite {
test("resolveURI") {
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
- assume(before.split(",").length == 1)
- assert(Utils.resolveURI(before, testWindows) === new URI(after))
- assert(Utils.resolveURI(after, testWindows) === new URI(after))
+ // This should test only single paths
+ assume(before.split(",").length === 1)
+ // Repeated invocations of resolveURI should yield the same result
+ def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
+ assert(resolve(after) === after)
+ assert(resolve(resolve(after)) === after)
+ assert(resolve(resolve(resolve(after))) === after)
+ // Also test resolveURIs with single paths
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
}
@@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite {
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
+ }
- // Test resolving comma-delimited paths
- assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
- assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
- assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
- assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
+ test("resolveURIs with multiple paths") {
+ def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
+ assume(before.split(",").length > 1)
+ assert(Utils.resolveURIs(before, testWindows) === after)
+ assert(Utils.resolveURIs(after, testWindows) === after)
+ // Repeated invocations of resolveURIs should yield the same result
+ def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
+ assert(resolve(after) === after)
+ assert(resolve(resolve(after)) === after)
+ assert(resolve(resolve(resolve(after))) === after)
+ }
+ val cwd = System.getProperty("user.dir")
+ assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
+ assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
+ assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
+ assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
- assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
+ assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
}
test("nonLocalPaths") {