aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-05-08 08:44:46 -0500
committerThomas Graves <tgraves@apache.org>2015-05-08 08:44:46 -0500
commitebff7327af5efa9f57c605284de4fae6b050ae0f (patch)
treeed814cf7c903da2404a55b4c5bdcc8eb0648f518 /yarn
parentc2f0821aad3b82dcd327e914c9b297e92526649d (diff)
downloadspark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.gz
spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.tar.bz2
spark-ebff7327af5efa9f57c605284de4fae6b050ae0f.zip
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
Based on https://github.com/apache/spark/pull/5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR. andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits: 66ffa43 [Lianhui Wang] Update Client.scala c2ad0f9 [Lianhui Wang] Update Client.scala 1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 20402cd [Lianhui Wang] use ZipEntry 9d87c3f [Lianhui Wang] update scala style e7bd971 [Lianhui Wang] address vanzin's comments 4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt e6b573b [Lianhui Wang] address vanzin's comments f11f84a [Lianhui Wang] zip pyspark archives 5192cca [Lianhui Wang] update import path 3b1e4c8 [Lianhui Wang] address tgravescs's comments 9396346 [Lianhui Wang] put zip to make-distribution.sh 0d2baf7 [Lianhui Wang] update import paths e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit 31e8e06 [Lianhui Wang] update code style 9f31dac [Lianhui Wang] update code and add comments f72987c [Lianhui Wang] add archives path to PYTHONPATH
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala23
1 files changed, 17 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 20ecaf092e..d21a739347 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -468,6 +468,17 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}
+ // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
+ // that can be passed on to the ApplicationMaster and the executors.
+ if (sparkConf.contains("spark.submit.pyArchives")) {
+ var pythonPath = sparkConf.get("spark.submit.pyArchives")
+ if (env.contains("PYTHONPATH")) {
+ pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
+ }
+ env("PYTHONPATH") = pythonPath
+ sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+ }
+
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
@@ -1074,7 +1085,7 @@ object Client extends Logging {
val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
- val hiveConfGet = (param:String) => Option(hiveConfClass
+ val hiveConfGet = (param: String) => Option(hiveConfClass
.getMethod("get", classOf[java.lang.String])
.invoke(hiveConf, param))
@@ -1096,7 +1107,7 @@ object Client extends Logging {
val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
- credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
+ credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
logDebug("Added hive.Server2.delegation.token to conf.")
hiveClass.getMethod("closeCurrent").invoke(null)
} else {
@@ -1141,13 +1152,13 @@ object Client extends Logging {
logInfo("Added HBase security token to credentials.")
} catch {
- case e:java.lang.NoSuchMethodException =>
+ case e: java.lang.NoSuchMethodException =>
logInfo("HBase Method not found: " + e)
- case e:java.lang.ClassNotFoundException =>
+ case e: java.lang.ClassNotFoundException =>
logDebug("HBase Class not found: " + e)
- case e:java.lang.NoClassDefFoundError =>
+ case e: java.lang.NoClassDefFoundError =>
logDebug("HBase Class not found: " + e)
- case e:Exception =>
+ case e: Exception =>
logError("Exception when obtaining HBase security token: " + e)
}
}