aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-05-08 08:44:46 -0500
committerThomas Graves <tgraves@apache.org>2015-05-08 08:44:46 -0500
commitebff7327af5efa9f57c605284de4fae6b050ae0f (patch)
treeed814cf7c903da2404a55b4c5bdcc8eb0648f518 /core
parentc2f0821aad3b82dcd327e914c9b297e92526649d (diff)
downloadspark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.gz
spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.bz2
spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.zip
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
Based on https://github.com/apache/spark/pull/5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR. andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits: 66ffa43 [Lianhui Wang] Update Client.scala c2ad0f9 [Lianhui Wang] Update Client.scala 1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 20402cd [Lianhui Wang] use ZipEntry 9d87c3f [Lianhui Wang] update scala style e7bd971 [Lianhui Wang] address vanzin's comments 4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt e6b573b [Lianhui Wang] address vanzin's comments f11f84a [Lianhui Wang] zip pyspark archives 5192cca [Lianhui Wang] update import path 3b1e4c8 [Lianhui Wang] address tgravescs's comments 9396346 [Lianhui Wang] put zip to make-distribution.sh 0d2baf7 [Lianhui Wang] update import paths e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit 31e8e06 [Lianhui Wang] update code style 9f31dac [Lianhui Wang] update code and add comments f72987c [Lianhui Wang] add archives path to PYTHONPATH
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala41
1 files changed, 41 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 8a0327984e..329fa06ba8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -332,6 +332,47 @@ object SparkSubmit {
}
}
+ // In yarn mode for a python app, add pyspark archives to files
+ // that can be distributed with the job
+ if (args.isPython && clusterManager == YARN) {
+ var pyArchives: String = null
+ val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
+ if (pyArchivesEnvOpt.isDefined) {
+ pyArchives = pyArchivesEnvOpt.get
+ } else {
+ if (!sys.env.contains("SPARK_HOME")) {
+ printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
+ }
+ val pythonPath = new ArrayBuffer[String]
+ for (sparkHome <- sys.env.get("SPARK_HOME")) {
+ val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
+ val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+ if (!pyArchivesFile.exists()) {
+ printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
+ }
+ val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+ if (!py4jFile.exists()) {
+ printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
+ "in yarn mode.")
+ }
+ pythonPath += pyArchivesFile.getAbsolutePath()
+ pythonPath += py4jFile.getAbsolutePath()
+ }
+ pyArchives = pythonPath.mkString(",")
+ }
+
+ pyArchives = pyArchives.split(",").map { localPath=>
+ val localURI = Utils.resolveURI(localPath)
+ if (localURI.getScheme != "local") {
+ args.files = mergeFileLists(args.files, localURI.toString)
+ new Path(localPath).getName
+ } else {
+ localURI.getPath
+ }
+ }.mkString(File.pathSeparator)
+ sysProps("spark.submit.pyArchives") = pyArchives
+ }
+
// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {