aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py20
1 files changed, 13 insertions, 7 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6c04923881..a90870ed3a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
- PairDeserializer
+ PairDeserializer, CompressedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
@@ -566,13 +566,19 @@ class SparkContext(object):
"""
Broadcast a read-only variable to the cluster, returning a
L{Broadcast<pyspark.broadcast.Broadcast>}
- object for reading it in distributed functions. The variable will be
- sent to each cluster only once.
+ object for reading it in distributed functions. The variable will
+ be sent to each cluster only once.
+
+ :keep: Keep the `value` in driver or not.
"""
- pickleSer = PickleSerializer()
- pickled = pickleSer.dumps(value)
- jbroadcast = self._jsc.broadcast(bytearray(pickled))
- return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars)
+ ser = CompressedSerializer(PickleSerializer())
+ # pass large object by py4j is very slow and need much memory
+ tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
+ ser.dump_stream([value], tempFile)
+ tempFile.close()
+ jbroadcast = self._jvm.PythonRDD.readBroadcastFromFile(self._jsc, tempFile.name)
+ return Broadcast(jbroadcast.id(), None, jbroadcast,
+ self._pickled_broadcast_vars, tempFile.name)
def accumulator(self, value, accum_param=None):
"""