aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b84d976114..e8fcc900ef 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -231,6 +231,13 @@ class RDD(object):
self._jrdd_deserializer = jrdd_deserializer
self._id = jrdd.id()
+ def _toPickleSerialization(self):
+ if (self._jrdd_deserializer == PickleSerializer() or
+ self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+ return self
+ else:
+ return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+
def id(self):
"""
A unique ID for this RDD (within its SparkContext).
@@ -1030,6 +1037,113 @@ class RDD(object):
"""
return self.take(1)[0]
+ def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
+ converted for output using either user specified converters or, by default,
+ L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+ @param conf: Hadoop job configuration, passed in as a dict
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ keyConverter, valueConverter, True)
+
+ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+ keyConverter=None, valueConverter=None, conf=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
+ will be inferred if not specified. Keys and values are converted for output using either
+ user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+ C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+ of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+ @param path: path to Hadoop file
+ @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.Text", None by default)
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop job configuration, passed in as a dict (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+ outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+
+ def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
+ converted for output using either user specified converters or, by default,
+ L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+ @param conf: Hadoop job configuration, passed in as a dict
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ keyConverter, valueConverter, False)
+
+ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+ keyConverter=None, valueConverter=None, conf=None,
+ compressionCodecClass=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the old Hadoop OutputFormat API (mapred package). Key and value types
+ will be inferred if not specified. Keys and values are converted for output using either
+ user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+ C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+ of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+ @param path: path to Hadoop file
+ @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+ (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.Text", None by default)
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: (None by default)
+ @param compressionCodecClass: (None by default)
+ """
+ jconf = self.ctx._dictToJavaMap(conf)
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+ outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
+ jconf, compressionCodecClass)
+
+ def saveAsSequenceFile(self, path, compressionCodecClass=None):
+ """
+ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+ system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
+ RDD's key and value types. The mechanism is as follows:
+ 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
+ 2. Keys and values of this Java RDD are converted to Writables and written out.
+
+ @param path: path to sequence file
+ @param compressionCodecClass: (None by default)
+ """
+ pickledRDD = self._toPickleSerialization()
+ batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+ self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+ path, compressionCodecClass)
+
def saveAsPickleFile(self, path, batchSize=10):
"""
Save this RDD as a SequenceFile of serialized objects. The serializer