diff options
author | Nick Pentreath <nick.pentreath@gmail.com> | 2014-06-09 22:21:03 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-06-09 22:21:03 -0700 |
commit | f971d6cb60d642178d6544217a25fa16ece34889 (patch) | |
tree | cba008802eda1755ff58eec8ad462d894f48d265 /python | |
parent | 6f2db8c2f51911f88a601ec5bf1509ea0e8173ed (diff) | |
download | spark-f971d6cb60d642178d6544217a25fa16ece34889.tar.gz spark-f971d6cb60d642178d6544217a25fa16ece34889.tar.bz2 spark-f971d6cb60d642178d6544217a25fa16ece34889.zip |
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it.
This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark.
# Overview
The basics are as follows:
1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark
2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives)
3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString```
4. ```PickleSerializer``` on the Python side deserializes.
This works "out the box" for simple ```Writable```s:
* ```Text```
* ```IntWritable```, ```DoubleWritable```, ```FloatWritable```
* ```NullWritable```
* ```BooleanWritable```
* ```BytesWritable```
* ```MapWritable```
It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added).
I've tested it out with ```ESInputFormat``` as an example and it works very nicely:
```python
conf = {"es.resource" : "index/type" }
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first()
```
I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box.
# Some things still outstanding:
1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~
2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~
3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~
4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR
Author: Nick Pentreath <nick.pentreath@gmail.com>
Closes #455 from MLnick/pyspark-inputformats and squashes the following commits:
268df7e [Nick Pentreath] Documentation changes mer @pwendell comments
761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry.
4c972d8 [Nick Pentreath] Add license headers
d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
cde6af9 [Nick Pentreath] Parameterize converter trait
5ebacfa [Nick Pentreath] Update docs for PySpark input formats
a985492 [Nick Pentreath] Move Converter examples to own package
365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface.
eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests
1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight
3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python
b65606f [Nick Pentreath] Add converter interface
5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None
085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs
43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide
94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods
1a4a1d6 [Nick Pentreath] Address @mateiz style comments
01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase
9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
84fe8e3 [Nick Pentreath] Python programming guide space formatting
d0f52b6 [Nick Pentreath] Python programming guide
7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
93ef995 [Nick Pentreath] Add back context.py changes
9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py
077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py
5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
35b8e3a [Nick Pentreath] Another fix for test ordering
bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
e001b94 [Nick Pentreath] Fix test failures due to ordering
78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide
64eb051 [Nick Pentreath] Scalastyle fix
e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests
1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir
17a656b [Nick Pentreath] remove binary sequencefile for tests
f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark
450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
31a2fff [Nick Pentreath] Scalastyle fixes
fc5099e [Nick Pentreath] Add Apache license headers
4e08983 [Nick Pentreath] Clean up docs for PySpark context methods
b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies
951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
f6aac55 [Nick Pentreath] Bring back msgpack
9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge
a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering
7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging
25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps
65360d5 [Nick Pentreath] Adding test SequenceFiles
0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
d72bf18 [Nick Pentreath] msgpack
dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
e67212a [Nick Pentreath] Add back msgpack dependency
f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
97ef708 [Nick Pentreath] Remove old writeToStream
2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data.
174f520 [Nick Pentreath] Add back graphx settings
703ee65 [Nick Pentreath] Add back msgpack
619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
eb40036 [Nick Pentreath] Remove unused comment lines
4d7ef2e [Nick Pentreath] Fix indentation
f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments
0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer
4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names
818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD
4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up
4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code
d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/context.py | 137 | ||||
-rw-r--r-- | python/pyspark/tests.py | 145 |
2 files changed, 282 insertions, 0 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 211918f5a0..062bec2381 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -342,6 +342,143 @@ class SparkContext(object): return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + def _dictToJavaMap(self, d): + jm = self._jvm.java.util.HashMap() + if not d: + d = {} + for k, v in d.iteritems(): + jm[k] = v + return jm + + def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, + valueConverter=None, minSplits=None): + """ + Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key + and value Writable classes + 2. Serialization is attempted via Pyrolite pickling + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side + + @param path: path to sequncefile + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: + @param valueConverter: + @param minSplits: minimum splits in dataset + (default min(2, sc.defaultParallelism)) + """ + minSplits = minSplits or min(self.defaultParallelism, 2) + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, + keyConverter, valueConverter, minSplits) + return RDD(jrdd, self, PickleSerializer()) + + def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is the same as for sc.sequenceFile. + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is the same as for sc.sequenceFile. + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java. + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, + keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1f2a6ea941..184ee810b8 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -198,6 +198,151 @@ class TestIO(PySparkTestCase): self.sc.parallelize([1]).foreach(func) +class TestInputFormat(PySparkTestCase): + + def setUp(self): + PySparkTestCase.setUp(self) + self.tempdir = tempfile.NamedTemporaryFile(delete=False) + os.unlink(self.tempdir.name) + self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) + + def tearDown(self): + PySparkTestCase.tearDown(self) + shutil.rmtree(self.tempdir.name) + + def test_sequencefiles(self): + basepath = self.tempdir.name + ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/", + "org.apache.hadoop.io.DoubleWritable", + "org.apache.hadoop.io.Text").collect()) + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + self.assertEqual(doubles, ed) + + text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/", + "org.apache.hadoop.io.Text", + "org.apache.hadoop.io.Text").collect()) + et = [(u'1', u'aa'), + (u'1', u'aa'), + (u'2', u'aa'), + (u'2', u'bb'), + (u'2', u'bb'), + (u'3', u'cc')] + self.assertEqual(text, et) + + bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + self.assertEqual(bools, eb) + + nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] + self.assertEqual(nulls, en) + + maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable").collect()) + em = [(1, {2.0: u'aa'}), + (1, {3.0: u'bb'}), + (2, {1.0: u'aa'}), + (2, {1.0: u'cc'}), + (2, {3.0: u'bb'}), + (3, {2.0: u'dd'})] + self.assertEqual(maps, em) + + clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable").collect()) + ec = (u'1', + {u'__class__': u'org.apache.spark.api.python.TestWritable', + u'double': 54.0, u'int': 123, u'str': u'test1'}) + self.assertEqual(clazz[0], ec) + + def test_oldhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + hello = self.sc.hadoopFile(hellopath, + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + hello = self.sc.newAPIHadoopFile(hellopath, + "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newolderror(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + def test_bad_inputs(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.sequenceFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.NotValidWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + def test_converter(self): + basepath = self.tempdir.name + maps = sorted(self.sc.sequenceFile( + basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable", + valueConverter="org.apache.spark.api.python.TestConverter").collect()) + em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])] + self.assertEqual(maps, em) + + class TestDaemon(unittest.TestCase): def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM |