aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-22 10:28:59 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-22 10:28:59 -0700
commitd4f2e5b0ef38db9d42bb0d5fbbbe6103ce047efe (patch)
treeda901dbbab9da1df45672edc97575a7f90c8f697
parentc23bf1aff4b9a1faf9d32c7b64acad2213f9515c (diff)
downloadspark-d4f2e5b0ef38db9d42bb0d5fbbbe6103ce047efe.tar.gz
spark-d4f2e5b0ef38db9d42bb0d5fbbbe6103ce047efe.tar.bz2
spark-d4f2e5b0ef38db9d42bb0d5fbbbe6103ce047efe.zip
Remove PYTHONPATH from SparkContext's executorEnvs.
It makes more sense to pass it in the dictionary of environment variables that is used to construct PythonRDD.
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala15
-rw-r--r--pyspark/pyspark/rdd.py8
3 files changed, 14 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index acb38ae33d..becf737597 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -113,7 +113,7 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
- "SPARK_TESTING", "PYTHONPATH")) {
+ "SPARK_TESTING")) {
val value = System.getenv(key)
if (value != null) {
executorEnvs(key) = value
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 528885fe5c..a593e53efd 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -131,18 +131,17 @@ trait PythonRDDBase {
}
class PythonRDD[T: ClassManifest](
- parent: RDD[T], command: Seq[String], envVars: Map[String, String],
+ parent: RDD[T], command: Seq[String], envVars: java.util.Map[String, String],
preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]])
extends RDD[Array[Byte]](parent.context) with PythonRDDBase {
- def this(parent: RDD[T], command: Seq[String], preservePartitoning: Boolean,
- pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
- this(parent, command, Map(), preservePartitoning, pythonExec, broadcastVars)
-
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
- def this(parent: RDD[T], command: String, preservePartitoning: Boolean, pythonExec: String, broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
- this(parent, PipedRDD.tokenize(command), preservePartitoning, pythonExec, broadcastVars)
+ def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
+ preservePartitoning: Boolean, pythonExec: String,
+ broadcastVars: java.util.List[Broadcast[Array[Byte]]]) =
+ this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
+ broadcastVars)
override def splits = parent.splits
@@ -151,7 +150,7 @@ class PythonRDD[T: ClassManifest](
override val partitioner = if (preservePartitoning) parent.partitioner else None
override def compute(split: Split): Iterator[Array[Byte]] =
- compute(split, envVars, command, parent, pythonExec, broadcastVars)
+ compute(split, envVars.toMap, command, parent, pythonExec, broadcastVars)
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index e2137fe06c..e4878c08ba 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -1,6 +1,7 @@
from base64 import standard_b64encode as b64enc
from collections import defaultdict
from itertools import chain, ifilter, imap
+import os
import shlex
from subprocess import Popen, PIPE
from threading import Thread
@@ -10,7 +11,7 @@ from pyspark.serializers import dump_pickle, load_pickle
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
-from py4j.java_collections import ListConverter
+from py4j.java_collections import ListConverter, MapConverter
class RDD(object):
@@ -447,8 +448,11 @@ class PipelinedRDD(RDD):
self.ctx.gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
class_manifest = self._prev_jrdd.classManifest()
+ env = MapConverter().convert(
+ {'PYTHONPATH' : os.environ.get("PYTHONPATH", "")},
+ self.ctx.gateway._gateway_client)
python_rdd = self.ctx.jvm.PythonRDD(self._prev_jrdd.rdd(),
- pipe_command, self.preservesPartitioning, self.ctx.pythonExec,
+ pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val