aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 211918f5a0..062bec2381 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -342,6 +342,143 @@ class SparkContext(object):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
+ def _dictToJavaMap(self, d):
+ jm = self._jvm.java.util.HashMap()
+ if not d:
+ d = {}
+ for k, v in d.iteritems():
+ jm[k] = v
+ return jm
+
+ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
+ valueConverter=None, minSplits=None):
+ """
+ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
+ a local file system (available on all nodes), or any Hadoop-supported file system URI.
+ The mechanism is as follows:
+ 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key
+ and value Writable classes
+ 2. Serialization is attempted via Pyrolite pickling
+ 3. If this fails, the fallback is to call 'toString' on each key and value
+ 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
+
+ @param path: path to sequncefile
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter:
+ @param valueConverter:
+ @param minSplits: minimum splits in dataset
+ (default min(2, sc.defaultParallelism))
+ """
+ minSplits = minSplits or min(self.defaultParallelism, 2)
+ jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
+ keyConverter, valueConverter, minSplits)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
+ a local file system (available on all nodes), or any Hadoop-supported file system URI.
+ The mechanism is the same as for sc.sequenceFile.
+
+ A Hadoop configuration can be passed in as a Python dict. This will be converted into a
+ Configuration in Java
+
+ @param path: path to Hadoop file
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
+ Hadoop configuration, which is passed in as a Python dict.
+ This will be converted into a Configuration in Java.
+ The mechanism is the same as for sc.sequenceFile.
+
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
+ a local file system (available on all nodes), or any Hadoop-supported file system URI.
+ The mechanism is the same as for sc.sequenceFile.
+
+ A Hadoop configuration can be passed in as a Python dict. This will be converted into a
+ Configuration in Java.
+
+ @param path: path to Hadoop file
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapred.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
+ Hadoop configuration, which is passed in as a Python dict.
+ This will be converted into a Configuration in Java.
+ The mechanism is the same as for sc.sequenceFile.
+
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapred.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
+ keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)