aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-01 11:29:47 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-01 11:29:47 -0800
commitb6a6092177a008cdcd19810d9d3a5715dae927b0 (patch)
tree2aea685655da9a2ed0acf4d7a40f81882e10b1e7 /python/pyspark/rdd.py
parent571af31304bd72d310c3b47a8471a4de206aa6fe (diff)
parent9cc6ff9c4e7eec2d62261fc166ad2ebade148752 (diff)
downloadspark-b6a6092177a008cdcd19810d9d3a5715dae927b0.tar.gz
spark-b6a6092177a008cdcd19810d9d3a5715dae927b0.tar.bz2
spark-b6a6092177a008cdcd19810d9d3a5715dae927b0.zip
Merge pull request #438 from JoshRosen/spark-674
Do not launch JavaGateways on workers (SPARK-674).
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d53355a8f1..d7cad2f372 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -407,7 +407,7 @@ class RDD(object):
return (str(x).encode("utf-8") for x in iterator)
keyed = PipelinedRDD(self, func)
keyed._bypass_serializer = True
- keyed._jrdd.map(self.ctx.jvm.BytesToString()).saveAsTextFile(path)
+ keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
# Pair functions
@@ -550,8 +550,8 @@ class RDD(object):
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
- pairRDD = self.ctx.jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
- partitioner = self.ctx.jvm.PythonPartitioner(numSplits,
+ pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
+ partitioner = self.ctx._jvm.PythonPartitioner(numSplits,
id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values()
rdd = RDD(jrdd, self.ctx)
@@ -730,13 +730,13 @@ class PipelinedRDD(RDD):
pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
- self.ctx.gateway._gateway_client)
+ self.ctx._gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
class_manifest = self._prev_jrdd.classManifest()
env = copy.copy(self.ctx.environment)
env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "")
- env = MapConverter().convert(env, self.ctx.gateway._gateway_client)
- python_rdd = self.ctx.jvm.PythonRDD(self._prev_jrdd.rdd(),
+ env = MapConverter().convert(env, self.ctx._gateway._gateway_client)
+ python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator, class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()