aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-09-21 17:57:21 -0400
committerAndrew Or <andrewor14@gmail.com>2016-09-21 17:57:21 -0400
commit8c3ee2bc42e6320b9341cebdba51a00162c897ea (patch)
tree16ed50761cb45cd5eb4ca7b31e6b53ee88d98621
parent9fcf1c51d518847eda7f5ea71337cfa7def3c45c (diff)
downloadspark-8c3ee2bc42e6320b9341cebdba51a00162c897ea.tar.gz
spark-8c3ee2bc42e6320b9341cebdba51a00162c897ea.tar.bz2
spark-8c3ee2bc42e6320b9341cebdba51a00162c897ea.zip
[SPARK-17512][CORE] Avoid formatting to python path for yarn and mesos cluster mode
## What changes were proposed in this pull request? Yarn and mesos cluster mode support remote python path (HDFS/S3 scheme) by their own mechanism, it is not necessary to check and format the python when running on these modes. This is a potential regression compared to 1.6, so here propose to fix it. ## How was this patch tested? Unit test to verify SparkSubmit arguments, also with local cluster verification. Because of lack of `MiniDFSCluster` support in Spark unit test, there's no integration test added. Author: jerryshao <sshao@hortonworks.com> Closes #15137 from jerryshao/SPARK-17512.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala19
2 files changed, 29 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 7b6d5a394b..80611658a1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -311,7 +311,7 @@ object SparkSubmit {
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.
if (args.isPython && !isYarnCluster && !isMesosCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
- printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
+ printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}")
}
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
@@ -322,7 +322,7 @@ object SparkSubmit {
// Require all R files to be local
if (args.isR && !isYarnCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
- printErrorAndExit(s"Only local R files are supported: $args.primaryResource")
+ printErrorAndExit(s"Only local R files are supported: ${args.primaryResource}")
}
}
@@ -633,7 +633,14 @@ object SparkSubmit {
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
- val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+ val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
+ PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+ } else {
+ // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
+ // support dealing with remote python files, they could distribute and add python files
+ // locally.
+ resolvedPyFiles
+ }
sysProps("spark.submit.pyFiles") = formattedPyFiles
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 961ece3e00..31c8fb2646 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -582,6 +582,25 @@ class SparkSubmitSuite
val sysProps3 = SparkSubmit.prepareSubmitEnvironment(appArgs3)._3
sysProps3("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+
+ // Test remote python files
+ val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
+ val writer4 = new PrintWriter(f4)
+ val remotePyFiles = "hdfs:///tmp/file1.py,hdfs:///tmp/file2.py"
+ writer4.println("spark.submit.pyFiles " + remotePyFiles)
+ writer4.close()
+ val clArgs4 = Seq(
+ "--master", "yarn",
+ "--deploy-mode", "cluster",
+ "--properties-file", f4.getPath,
+ "hdfs:///tmp/mister.py"
+ )
+ val appArgs4 = new SparkSubmitArguments(clArgs4)
+ val sysProps4 = SparkSubmit.prepareSubmitEnvironment(appArgs4)._3
+ // Should not format python path for yarn cluster mode
+ sysProps4("spark.submit.pyFiles") should be(
+ Utils.resolveURIs(remotePyFiles)
+ )
}
test("user classpath first in driver") {