aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2014-06-09 22:21:03 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-09 22:21:03 -0700
commitf971d6cb60d642178d6544217a25fa16ece34889 (patch)
treecba008802eda1755ff58eec8ad462d894f48d265 /python
parent6f2db8c2f51911f88a601ec5bf1509ea0e8173ed (diff)
downloadspark-f971d6cb60d642178d6544217a25fa16ece34889.tar.gz
spark-f971d6cb60d642178d6544217a25fa16ece34889.tar.bz2
spark-f971d6cb60d642178d6544217a25fa16ece34889.zip
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it. This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark. # Overview The basics are as follows: 1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark 2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives) 3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString``` 4. ```PickleSerializer``` on the Python side deserializes. This works "out the box" for simple ```Writable```s: * ```Text``` * ```IntWritable```, ```DoubleWritable```, ```FloatWritable``` * ```NullWritable``` * ```BooleanWritable``` * ```BytesWritable``` * ```MapWritable``` It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added). I've tested it out with ```ESInputFormat``` as an example and it works very nicely: ```python conf = {"es.resource" : "index/type" } rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) rdd.first() ``` I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box. # Some things still outstanding: 1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~ 2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~ 3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~ 4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR Author: Nick Pentreath <nick.pentreath@gmail.com> Closes #455 from MLnick/pyspark-inputformats and squashes the following commits: 268df7e [Nick Pentreath] Documentation changes mer @pwendell comments 761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry. 4c972d8 [Nick Pentreath] Add license headers d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats cde6af9 [Nick Pentreath] Parameterize converter trait 5ebacfa [Nick Pentreath] Update docs for PySpark input formats a985492 [Nick Pentreath] Move Converter examples to own package 365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface. eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests 1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight 3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python b65606f [Nick Pentreath] Add converter interface 5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None 085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs 43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide 94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods 1a4a1d6 [Nick Pentreath] Address @mateiz style comments 01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase 9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 84fe8e3 [Nick Pentreath] Python programming guide space formatting d0f52b6 [Nick Pentreath] Python programming guide 7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 93ef995 [Nick Pentreath] Add back context.py changes 9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py 077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py 5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 35b8e3a [Nick Pentreath] Another fix for test ordering bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e001b94 [Nick Pentreath] Fix test failures due to ordering 78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide 64eb051 [Nick Pentreath] Scalastyle fix e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests 1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir 17a656b [Nick Pentreath] remove binary sequencefile for tests f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark 450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 31a2fff [Nick Pentreath] Scalastyle fixes fc5099e [Nick Pentreath] Add Apache license headers 4e08983 [Nick Pentreath] Clean up docs for PySpark context methods b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies 951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats f6aac55 [Nick Pentreath] Bring back msgpack 9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering 7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging 25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps 65360d5 [Nick Pentreath] Adding test SequenceFiles 0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats d72bf18 [Nick Pentreath] msgpack dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e67212a [Nick Pentreath] Add back msgpack dependency f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 97ef708 [Nick Pentreath] Remove old writeToStream 2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data. 174f520 [Nick Pentreath] Add back graphx settings 703ee65 [Nick Pentreath] Add back msgpack 619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats eb40036 [Nick Pentreath] Remove unused comment lines 4d7ef2e [Nick Pentreath] Fix indentation f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments 0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer 4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names 818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD 4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up 4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py137
-rw-r--r--python/pyspark/tests.py145
2 files changed, 282 insertions, 0 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 211918f5a0..062bec2381 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -342,6 +342,143 @@ class SparkContext(object):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
+ def _dictToJavaMap(self, d):
+ jm = self._jvm.java.util.HashMap()
+ if not d:
+ d = {}
+ for k, v in d.iteritems():
+ jm[k] = v
+ return jm
+
+ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
+ valueConverter=None, minSplits=None):
+ """
+ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
+ a local file system (available on all nodes), or any Hadoop-supported file system URI.
+ The mechanism is as follows:
+ 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key
+ and value Writable classes
+ 2. Serialization is attempted via Pyrolite pickling
+ 3. If this fails, the fallback is to call 'toString' on each key and value
+ 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
+
+ @param path: path to sequncefile
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter:
+ @param valueConverter:
+ @param minSplits: minimum splits in dataset
+ (default min(2, sc.defaultParallelism))
+ """
+ minSplits = minSplits or min(self.defaultParallelism, 2)
+ jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
+ keyConverter, valueConverter, minSplits)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
+ a local file system (available on all nodes), or any Hadoop-supported file system URI.
+ The mechanism is the same as for sc.sequenceFile.
+
+ A Hadoop configuration can be passed in as a Python dict. This will be converted into a
+ Configuration in Java
+
+ @param path: path to Hadoop file
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
+ Hadoop configuration, which is passed in as a Python dict.
+ This will be converted into a Configuration in Java.
+ The mechanism is the same as for sc.sequenceFile.
+
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
+ a local file system (available on all nodes), or any Hadoop-supported file system URI.
+ The mechanism is the same as for sc.sequenceFile.
+
+ A Hadoop configuration can be passed in as a Python dict. This will be converted into a
+ Configuration in Java.
+
+ @param path: path to Hadoop file
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapred.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
+ valueClass, keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
+ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
+ valueConverter=None, conf=None):
+ """
+ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
+ Hadoop configuration, which is passed in as a Python dict.
+ This will be converted into a Configuration in Java.
+ The mechanism is the same as for sc.sequenceFile.
+
+ @param inputFormatClass: fully qualified classname of Hadoop InputFormat
+ (e.g. "org.apache.hadoop.mapred.TextInputFormat")
+ @param keyClass: fully qualified classname of key Writable class
+ (e.g. "org.apache.hadoop.io.Text")
+ @param valueClass: fully qualified classname of value Writable class
+ (e.g. "org.apache.hadoop.io.LongWritable")
+ @param keyConverter: (None by default)
+ @param valueConverter: (None by default)
+ @param conf: Hadoop configuration, passed in as a dict
+ (None by default)
+ """
+ jconf = self._dictToJavaMap(conf)
+ jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
+ keyConverter, valueConverter, jconf)
+ return RDD(jrdd, self, PickleSerializer())
+
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 1f2a6ea941..184ee810b8 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -198,6 +198,151 @@ class TestIO(PySparkTestCase):
self.sc.parallelize([1]).foreach(func)
+class TestInputFormat(PySparkTestCase):
+
+ def setUp(self):
+ PySparkTestCase.setUp(self)
+ self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+ os.unlink(self.tempdir.name)
+ self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc)
+
+ def tearDown(self):
+ PySparkTestCase.tearDown(self)
+ shutil.rmtree(self.tempdir.name)
+
+ def test_sequencefiles(self):
+ basepath = self.tempdir.name
+ ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text").collect())
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.assertEqual(ints, ei)
+
+ doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/",
+ "org.apache.hadoop.io.DoubleWritable",
+ "org.apache.hadoop.io.Text").collect())
+ ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
+ self.assertEqual(doubles, ed)
+
+ text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
+ "org.apache.hadoop.io.Text",
+ "org.apache.hadoop.io.Text").collect())
+ et = [(u'1', u'aa'),
+ (u'1', u'aa'),
+ (u'2', u'aa'),
+ (u'2', u'bb'),
+ (u'2', u'bb'),
+ (u'3', u'cc')]
+ self.assertEqual(text, et)
+
+ bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BooleanWritable").collect())
+ eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
+ self.assertEqual(bools, eb)
+
+ nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BooleanWritable").collect())
+ en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
+ self.assertEqual(nulls, en)
+
+ maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable").collect())
+ em = [(1, {2.0: u'aa'}),
+ (1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (2, {1.0: u'cc'}),
+ (2, {3.0: u'bb'}),
+ (3, {2.0: u'dd'})]
+ self.assertEqual(maps, em)
+
+ clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
+ "org.apache.hadoop.io.Text",
+ "org.apache.spark.api.python.TestWritable").collect())
+ ec = (u'1',
+ {u'__class__': u'org.apache.spark.api.python.TestWritable',
+ u'double': 54.0, u'int': 123, u'str': u'test1'})
+ self.assertEqual(clazz[0], ec)
+
+ def test_oldhadoop(self):
+ basepath = self.tempdir.name
+ ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text").collect())
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.assertEqual(ints, ei)
+
+ hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+ hello = self.sc.hadoopFile(hellopath,
+ "org.apache.hadoop.mapred.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text").collect()
+ result = [(0, u'Hello World!')]
+ self.assertEqual(hello, result)
+
+ def test_newhadoop(self):
+ basepath = self.tempdir.name
+ ints = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text").collect())
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.assertEqual(ints, ei)
+
+ hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+ hello = self.sc.newAPIHadoopFile(hellopath,
+ "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text").collect()
+ result = [(0, u'Hello World!')]
+ self.assertEqual(hello, result)
+
+ def test_newolderror(self):
+ basepath = self.tempdir.name
+ self.assertRaises(Exception, lambda: self.sc.hadoopFile(
+ basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text"))
+
+ self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
+ basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text"))
+
+ def test_bad_inputs(self):
+ basepath = self.tempdir.name
+ self.assertRaises(Exception, lambda: self.sc.sequenceFile(
+ basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.io.NotValidWritable",
+ "org.apache.hadoop.io.Text"))
+ self.assertRaises(Exception, lambda: self.sc.hadoopFile(
+ basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.mapred.NotValidInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text"))
+ self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
+ basepath + "/sftestdata/sfint/",
+ "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text"))
+
+ def test_converter(self):
+ basepath = self.tempdir.name
+ maps = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfmap/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable",
+ valueConverter="org.apache.spark.api.python.TestConverter").collect())
+ em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])]
+ self.assertEqual(maps, em)
+
+
class TestDaemon(unittest.TestCase):
def connect(self, port):
from socket import socket, AF_INET, SOCK_STREAM