aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-10 13:17:29 -0700
committerAndrew Or <andrew@databricks.com>2015-06-10 13:17:29 -0700
commit38112905bc3b33f2ae75274afba1c30e116f6e46 (patch)
treea207e54fe8f1efe3d6c9c8455e3bc55453b47564 /yarn/src/test
parent8f7308f9c49805b9486aaae5f60e4481e8ba24e8 (diff)
downloadspark-38112905bc3b33f2ae75274afba1c30e116f6e46.tar.gz
spark-38112905bc3b33f2ae75274afba1c30e116f6e46.tar.bz2
spark-38112905bc3b33f2ae75274afba1c30e116f6e46.zip
[SPARK-5479] [YARN] Handle --py-files correctly in YARN.
The bug description is a little misleading: the actual issue is that .py files are not handled correctly when distributed by YARN. They're added to "spark.submit.pyFiles", which, when processed by context.py, explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS), and that does not include .py files. On top of that, archives were not handled at all! They made it to the driver's python path, but never made it to executors, since the mechanism used to propagate their location (spark.submit.pyFiles) only works on the driver side. So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH correctly for both driver and executors. Individual .py files are placed in a subdirectory of the container's local dir in the cluster, which is then added to the python path. Archives are added directly. The change, as a side effect, ends up solving the symptom described in the bug. The issue was not that the files were not being distributed, but that they were never made visible to the python application running under Spark. Also included is a proper unit test for running python on YARN, which broke in several different ways with the previous code. A short walk around of the changes: - SparkSubmit does not try to be smart about how YARN handles python files anymore. It just passes down the configs to the YARN client code. - The YARN client distributes python files and archives differently, placing the files in a subdirectory. - The YARN client now sets PYTHONPATH for the processes it launches; to properly handle different locations, it uses YARN's support for embedding env variables, so to avoid YARN expanding those at the wrong time, SparkConf is now propagated to the AM using a conf file instead of command line options. - Because the Client initialization code is a maze of implicit dependencies, some code needed to be moved around to make sure all needed state was available when the code ran. - The pyspark tests in YarnClusterSuite now actually distribute and try to use both a python file and an archive containing a different python module. Also added a yarn-client tests for completeness. - I cleaned up some of the code around distributing files to YARN, to avoid adding more copied & pasted code to handle the new files being distributed. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #6360 from vanzin/SPARK-5479 and squashes the following commits: bcaf7e6 [Marcelo Vanzin] Feedback. c47501f [Marcelo Vanzin] Fix yarn-client mode. 46b1d0c [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 c743778 [Marcelo Vanzin] Only pyspark cares about python archives. c8e5a82 [Marcelo Vanzin] Actually run pyspark in client mode. 705571d [Marcelo Vanzin] Move some code to the YARN module. 1dd4d0c [Marcelo Vanzin] Review feedback. 71ee736 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 220358b [Marcelo Vanzin] Scalastyle. cdbb990 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 7fe3cd4 [Marcelo Vanzin] No need to distribute primary file to executors. 09045f1 [Marcelo Vanzin] Style. 943cbf4 [Marcelo Vanzin] [SPARK-5479] [yarn] Handle --py-files correctly in YARN.
Diffstat (limited to 'yarn/src/test')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala61
2 files changed, 42 insertions, 23 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 01d33c9ce9..4ec976aa31 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -113,7 +113,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
Environment.PWD.$()
}
cp should contain(pwdVar)
- cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
+ cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
@@ -129,7 +129,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
try {
- client.prepareLocalResources(tempDir.getAbsolutePath())
+ client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
// The non-local path should be propagated by name only, since it will end up in the app's
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 93d587d0cb..a0f25ba450 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -56,6 +56,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
""".stripMargin
private val TEST_PYFILE = """
+ |import mod1, mod2
|import sys
|from operator import add
|
@@ -67,7 +68,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
| sc = SparkContext(conf=SparkConf())
| status = open(sys.argv[1],'w')
| result = "failure"
- | rdd = sc.parallelize(range(10))
+ | rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func())
| cnt = rdd.count()
| if cnt == 10:
| result = "success"
@@ -76,6 +77,11 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
| sc.stop()
""".stripMargin
+ private val TEST_PYMODULE = """
+ |def func():
+ | return 42
+ """.stripMargin
+
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
@@ -124,7 +130,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+ hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
assert(hadoopConfDir.mkdir())
File.createTempFile("token", ".txt", hadoopConfDir)
}
@@ -151,26 +157,12 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
}
}
- // Enable this once fix SPARK-6700
- test("run Python application in yarn-cluster mode") {
- val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
- val pyFile = new File(tempDir, "test2.py")
- Files.write(TEST_PYFILE, pyFile, UTF_8)
- var result = File.createTempFile("result", null, tempDir)
+ test("run Python application in yarn-client mode") {
+ testPySpark(true)
+ }
- // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
- // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
- val sparkHome = sys.props("spark.test.home")
- val extraConf = Map(
- "spark.executorEnv.SPARK_HOME" -> sparkHome,
- "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
-
- runSpark(false, primaryPyFile.getAbsolutePath(),
- sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
- appArgs = Seq(result.getAbsolutePath()),
- extraConf = extraConf)
- checkResult(result)
+ test("run Python application in yarn-cluster mode") {
+ testPySpark(false)
}
test("user class path first in client mode") {
@@ -188,6 +180,33 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
checkResult(result)
}
+ private def testPySpark(clientMode: Boolean): Unit = {
+ val primaryPyFile = new File(tempDir, "test.py")
+ Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
+
+ val moduleDir =
+ if (clientMode) {
+ // In client-mode, .py files added with --py-files are not visible in the driver.
+ // This is something that the launcher library would have to handle.
+ tempDir
+ } else {
+ val subdir = new File(tempDir, "pyModules")
+ subdir.mkdir()
+ subdir
+ }
+ val pyModule = new File(moduleDir, "mod1.py")
+ Files.write(TEST_PYMODULE, pyModule, UTF_8)
+
+ val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
+ val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
+ val result = File.createTempFile("result", null, tempDir)
+
+ runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files", pyFiles),
+ appArgs = Seq(result.getAbsolutePath()))
+ checkResult(result)
+ }
+
private def testUseClassPathFirst(clientMode: Boolean): Unit = {
// Create a jar file that contains a different version of "test.resource".
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)