diff options
author | Josh Rosen <rosenville@gmail.com> | 2012-08-18 16:07:10 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-08-21 14:01:27 -0700 |
commit | fd94e5443c99775bfad1928729f5075c900ad0f9 (patch) | |
tree | 1bebffa4c656266bc35bc182e8e6569cc34c5079 /pyspark/pyspark/context.py | |
parent | 13b9514966a423f80f672f23f42ec3f0113936fd (diff) | |
download | spark-fd94e5443c99775bfad1928729f5075c900ad0f9.tar.gz spark-fd94e5443c99775bfad1928729f5075c900ad0f9.tar.bz2 spark-fd94e5443c99775bfad1928729f5075c900ad0f9.zip |
Use only cPickle for serialization in Python API.
Objects serialized with JSON can be compared for equality, but JSON can be slow
to serialize and only supports a limited range of data types.
Diffstat (limited to 'pyspark/pyspark/context.py')
-rw-r--r-- | pyspark/pyspark/context.py | 49 |
1 files changed, 19 insertions, 30 deletions
diff --git a/pyspark/pyspark/context.py b/pyspark/pyspark/context.py index 587ab12b5f..ac7e4057e9 100644 --- a/pyspark/pyspark/context.py +++ b/pyspark/pyspark/context.py @@ -3,22 +3,24 @@ import atexit from tempfile import NamedTemporaryFile from pyspark.java_gateway import launch_gateway -from pyspark.serializers import JSONSerializer, NopSerializer -from pyspark.rdd import RDD, PairRDD +from pyspark.serializers import PickleSerializer, dumps +from pyspark.rdd import RDD class SparkContext(object): gateway = launch_gateway() jvm = gateway.jvm - python_dump = jvm.spark.api.python.PythonRDD.pythonDump + pickleFile = jvm.spark.api.python.PythonRDD.pickleFile + asPickle = jvm.spark.api.python.PythonRDD.asPickle + arrayAsPickle = jvm.spark.api.python.PythonRDD.arrayAsPickle - def __init__(self, master, name, defaultSerializer=JSONSerializer, - defaultParallelism=None, pythonExec='python'): + + def __init__(self, master, name, defaultParallelism=None, + pythonExec='python'): self.master = master self.name = name self._jsc = self.jvm.JavaSparkContext(master, name) - self.defaultSerializer = defaultSerializer self.defaultParallelism = \ defaultParallelism or self._jsc.sc().defaultParallelism() self.pythonExec = pythonExec @@ -31,39 +33,26 @@ class SparkContext(object): self._jsc.stop() self._jsc = None - def parallelize(self, c, numSlices=None, serializer=None): - serializer = serializer or self.defaultSerializer - numSlices = numSlices or self.defaultParallelism - # Calling the Java parallelize() method with an ArrayList is too slow, - # because it sends O(n) Py4J commands. As an alternative, serialized - # objects are written to a file and loaded through textFile(). - tempFile = NamedTemporaryFile(delete=False) - tempFile.writelines(serializer.dumps(x) + '\n' for x in c) - tempFile.close() - atexit.register(lambda: os.unlink(tempFile.name)) - return self.textFile(tempFile.name, numSlices, serializer) - - def parallelizePairs(self, c, numSlices=None, keySerializer=None, - valSerializer=None): + def parallelize(self, c, numSlices=None): """ >>> sc = SparkContext("local", "test") - >>> rdd = sc.parallelizePairs([(1, 2), (3, 4)]) + >>> rdd = sc.parallelize([(1, 2), (3, 4)]) >>> rdd.collect() [(1, 2), (3, 4)] """ - keySerializer = keySerializer or self.defaultSerializer - valSerializer = valSerializer or self.defaultSerializer numSlices = numSlices or self.defaultParallelism + # Calling the Java parallelize() method with an ArrayList is too slow, + # because it sends O(n) Py4J commands. As an alternative, serialized + # objects are written to a file and loaded through textFile(). tempFile = NamedTemporaryFile(delete=False) - for (k, v) in c: - tempFile.write(keySerializer.dumps(k).rstrip('\r\n') + '\n') - tempFile.write(valSerializer.dumps(v).rstrip('\r\n') + '\n') + for x in c: + dumps(PickleSerializer.dumps(x), tempFile) tempFile.close() atexit.register(lambda: os.unlink(tempFile.name)) - jrdd = self.textFile(tempFile.name, numSlices)._pipePairs([], "echo") - return PairRDD(jrdd, self, keySerializer, valSerializer) + jrdd = self.pickleFile(self._jsc, tempFile.name, numSlices) + return RDD(jrdd, self) - def textFile(self, name, numSlices=None, serializer=NopSerializer): + def textFile(self, name, numSlices=None): numSlices = numSlices or self.defaultParallelism jrdd = self._jsc.textFile(name, numSlices) - return RDD(jrdd, self, serializer) + return RDD(jrdd, self) |