aboutsummaryrefslogtreecommitdiff
path: root/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'pyspark')
-rw-r--r--pyspark/pyspark/rdd.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index e2137fe06c..e4878c08ba 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -1,6 +1,7 @@
from base64 import standard_b64encode as b64enc
from collections import defaultdict
from itertools import chain, ifilter, imap
+import os
import shlex
from subprocess import Popen, PIPE
from threading import Thread
@@ -10,7 +11,7 @@ from pyspark.serializers import dump_pickle, load_pickle
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
-from py4j.java_collections import ListConverter
+from py4j.java_collections import ListConverter, MapConverter
class RDD(object):
@@ -447,8 +448,11 @@ class PipelinedRDD(RDD):
self.ctx.gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
class_manifest = self._prev_jrdd.classManifest()
+ env = MapConverter().convert(
+ {'PYTHONPATH' : os.environ.get("PYTHONPATH", "")},
+ self.ctx.gateway._gateway_client)
python_rdd = self.ctx.jvm.PythonRDD(self._prev_jrdd.rdd(),
- pipe_command, self.preservesPartitioning, self.ctx.pythonExec,
+ pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val