diff options
author | Kan Zhang <kzhang@apache.org> | 2014-07-30 13:19:05 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-07-30 13:19:05 -0700 |
commit | 94d1f46fc43c0cb85125f757fb40db9271caf1f4 (patch) | |
tree | 8878443a963ad6ce5ba3af679567d893c8df70cc /python/pyspark/context.py | |
parent | 437dc8c5b54f0dcf9564c1fb07e8dce9e771c8cd (diff) | |
download | spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.gz spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.bz2 spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.zip |
[SPARK-2024] Add saveAsSequenceFile to PySpark
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024
This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats.
* Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs.
* Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types.
* No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples.
* Added HBase and Cassandra output examples to show how custom output formats and converters can be used.
cc MLnick mateiz ahirreddy pwendell
Author: Kan Zhang <kzhang@apache.org>
Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits:
c01e3ef [Kan Zhang] [SPARK-2024] code formatting
6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD
d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10
57a7a5e [Kan Zhang] [SPARK-2024] correcting typo
75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD
0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests
9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests
0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases
7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 51 |
1 files changed, 36 insertions, 15 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 830a6ee03f..7b0f8d83ae 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -60,6 +60,7 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _default_batch_size_for_serialized_input = 10 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, @@ -378,7 +379,7 @@ class SparkContext(object): return jm def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, - valueConverter=None, minSplits=None): + valueConverter=None, minSplits=None, batchSize=None): """ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -398,14 +399,18 @@ class SparkContext(object): @param valueConverter: @param minSplits: minimum splits in dataset (default min(2, sc.defaultParallelism)) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ minSplits = minSplits or min(self.defaultParallelism, 2) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, - keyConverter, valueConverter, minSplits) - return RDD(jrdd, self, PickleSerializer()) + keyConverter, valueConverter, minSplits, batchSize) + return RDD(jrdd, self, ser) def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -425,14 +430,18 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. @@ -449,14 +458,18 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -476,14 +489,18 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. @@ -500,11 +517,15 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, - keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) |