diff options
author | Lianhui Wang <lianhuiwang09@gmail.com> | 2015-05-08 08:44:46 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2015-05-08 08:44:46 -0500 |
commit | ebff7327af5efa9f57c605284de4fae6b050ae0f (patch) | |
tree | ed814cf7c903da2404a55b4c5bdcc8eb0648f518 /yarn | |
parent | c2f0821aad3b82dcd327e914c9b297e92526649d (diff) | |
download | spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.gz spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.bz2 spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.zip |
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
Based on https://github.com/apache/spark/pull/5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR.
andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks.
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits:
66ffa43 [Lianhui Wang] Update Client.scala
c2ad0f9 [Lianhui Wang] Update Client.scala
1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869
20402cd [Lianhui Wang] use ZipEntry
9d87c3f [Lianhui Wang] update scala style
e7bd971 [Lianhui Wang] address vanzin's comments
4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt
e6b573b [Lianhui Wang] address vanzin's comments
f11f84a [Lianhui Wang] zip pyspark archives
5192cca [Lianhui Wang] update import path
3b1e4c8 [Lianhui Wang] address tgravescs's comments
9396346 [Lianhui Wang] put zip to make-distribution.sh
0d2baf7 [Lianhui Wang] update import paths
e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit
31e8e06 [Lianhui Wang] update code style
9f31dac [Lianhui Wang] update code and add comments
f72987c [Lianhui Wang] add archives path to PYTHONPATH
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 20ecaf092e..d21a739347 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -468,6 +468,17 @@ private[spark] class Client( env("SPARK_YARN_USER_ENV") = userEnvs } + // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH + // that can be passed on to the ApplicationMaster and the executors. + if (sparkConf.contains("spark.submit.pyArchives")) { + var pythonPath = sparkConf.get("spark.submit.pyArchives") + if (env.contains("PYTHONPATH")) { + pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + } + env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for @@ -1074,7 +1085,7 @@ object Client extends Logging { val hiveConf = hiveClass.getMethod("getConf").invoke(hive) val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - val hiveConfGet = (param:String) => Option(hiveConfClass + val hiveConfGet = (param: String) => Option(hiveConfClass .getMethod("get", classOf[java.lang.String]) .invoke(hiveConf, param)) @@ -1096,7 +1107,7 @@ object Client extends Logging { val hive2Token = new Token[DelegationTokenIdentifier]() hive2Token.decodeFromUrlString(tokenStr) - credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token) + credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) logDebug("Added hive.Server2.delegation.token to conf.") hiveClass.getMethod("closeCurrent").invoke(null) } else { @@ -1141,13 +1152,13 @@ object Client extends Logging { logInfo("Added HBase security token to credentials.") } catch { - case e:java.lang.NoSuchMethodException => + case e: java.lang.NoSuchMethodException => logInfo("HBase Method not found: " + e) - case e:java.lang.ClassNotFoundException => + case e: java.lang.ClassNotFoundException => logDebug("HBase Class not found: " + e) - case e:java.lang.NoClassDefFoundError => + case e: java.lang.NoClassDefFoundError => logDebug("HBase Class not found: " + e) - case e:Exception => + case e: Exception => logError("Exception when obtaining HBase security token: " + e) } } |