aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py317
1 files changed, 304 insertions, 13 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8486c8595b..c29deb9574 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,6 +19,7 @@
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
+from array import array
from fileinput import input
from glob import glob
import os
@@ -327,6 +328,17 @@ class TestInputFormat(PySparkTestCase):
ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
self.assertEqual(doubles, ed)
+ bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BytesWritable").collect())
+ ebs = [(1, bytearray('aa', 'utf-8')),
+ (1, bytearray('aa', 'utf-8')),
+ (2, bytearray('aa', 'utf-8')),
+ (2, bytearray('bb', 'utf-8')),
+ (2, bytearray('bb', 'utf-8')),
+ (3, bytearray('cc', 'utf-8'))]
+ self.assertEqual(bytes, ebs)
+
text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text").collect())
@@ -353,14 +365,34 @@ class TestInputFormat(PySparkTestCase):
maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable").collect())
- em = [(1, {2.0: u'aa'}),
+ em = [(1, {}),
(1, {3.0: u'bb'}),
(2, {1.0: u'aa'}),
(2, {1.0: u'cc'}),
- (2, {3.0: u'bb'}),
(3, {2.0: u'dd'})]
self.assertEqual(maps, em)
+ # arrays get pickled to tuples by default
+ tuples = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfarray/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable").collect())
+ et = [(1, ()),
+ (2, (3.0, 4.0, 5.0)),
+ (3, (4.0, 5.0, 6.0))]
+ self.assertEqual(tuples, et)
+
+ # with custom converters, primitive arrays can stay as arrays
+ arrays = sorted(self.sc.sequenceFile(
+ basepath + "/sftestdata/sfarray/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+ ea = [(1, array('d')),
+ (2, array('d', [3.0, 4.0, 5.0])),
+ (3, array('d', [4.0, 5.0, 6.0]))]
+ self.assertEqual(arrays, ea)
+
clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
@@ -369,6 +401,12 @@ class TestInputFormat(PySparkTestCase):
u'double': 54.0, u'int': 123, u'str': u'test1'})
self.assertEqual(clazz[0], ec)
+ unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
+ "org.apache.hadoop.io.Text",
+ "org.apache.spark.api.python.TestWritable",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_clazz[0], ec)
+
def test_oldhadoop(self):
basepath = self.tempdir.name
ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
@@ -379,10 +417,11 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- hello = self.sc.hadoopFile(hellopath,
- "org.apache.hadoop.mapred.TextInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "org.apache.hadoop.io.Text").collect()
+ oldconf = {"mapred.input.dir" : hellopath}
+ hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text",
+ conf=oldconf).collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)
@@ -397,10 +436,11 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- hello = self.sc.newAPIHadoopFile(hellopath,
- "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "org.apache.hadoop.io.Text").collect()
+ newconf = {"mapred.input.dir" : hellopath}
+ hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
+ "org.apache.hadoop.io.LongWritable",
+ "org.apache.hadoop.io.Text",
+ conf=newconf).collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)
@@ -435,16 +475,267 @@ class TestInputFormat(PySparkTestCase):
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text"))
- def test_converter(self):
+ def test_converters(self):
+ # use of custom converters
basepath = self.tempdir.name
maps = sorted(self.sc.sequenceFile(
basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable",
- valueConverter="org.apache.spark.api.python.TestConverter").collect())
- em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])]
+ keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
+ valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
+ em = [(u'\x01', []),
+ (u'\x01', [3.0]),
+ (u'\x02', [1.0]),
+ (u'\x02', [1.0]),
+ (u'\x03', [2.0])]
+ self.assertEqual(maps, em)
+
+class TestOutputFormat(PySparkTestCase):
+
+ def setUp(self):
+ PySparkTestCase.setUp(self)
+ self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+ os.unlink(self.tempdir.name)
+
+ def tearDown(self):
+ PySparkTestCase.tearDown(self)
+ shutil.rmtree(self.tempdir.name, ignore_errors=True)
+
+ def test_sequencefiles(self):
+ basepath = self.tempdir.name
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
+ ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
+ self.assertEqual(ints, ei)
+
+ ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
+ self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
+ doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
+ self.assertEqual(doubles, ed)
+
+ ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
+ self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
+ bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
+ self.assertEqual(bytes, ebs)
+
+ et = [(u'1', u'aa'),
+ (u'2', u'bb'),
+ (u'3', u'cc')]
+ self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
+ text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
+ self.assertEqual(text, et)
+
+ eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
+ self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
+ bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
+ self.assertEqual(bools, eb)
+
+ en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
+ self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
+ nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
+ self.assertEqual(nulls, en)
+
+ em = [(1, {}),
+ (1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (2, {1.0: u'cc'}),
+ (3, {2.0: u'dd'})]
+ self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
+ maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect())
self.assertEqual(maps, em)
+ def test_oldhadoop(self):
+ basepath = self.tempdir.name
+ dict_data = [(1, {}),
+ (1, {"row1" : 1.0}),
+ (2, {"row2" : 2.0})]
+ self.sc.parallelize(dict_data).saveAsHadoopFile(
+ basepath + "/oldhadoop/",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable")
+ result = sorted(self.sc.hadoopFile(
+ basepath + "/oldhadoop/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable").collect())
+ self.assertEqual(result, dict_data)
+
+ conf = {
+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
+ "mapred.output.dir" : basepath + "/olddataset/"}
+ self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
+ input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+ old_dataset = sorted(self.sc.hadoopRDD(
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable",
+ conf=input_conf).collect())
+ self.assertEqual(old_dataset, dict_data)
+
+ def test_newhadoop(self):
+ basepath = self.tempdir.name
+ # use custom ArrayWritable types and converters to handle arrays
+ array_data = [(1, array('d')),
+ (1, array('d', [1.0, 2.0, 3.0])),
+ (2, array('d', [3.0, 4.0, 5.0]))]
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+ result = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/newhadoop/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+ self.assertEqual(result, array_data)
+
+ conf = {"mapreduce.outputformat.class" :
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
+ "mapred.output.dir" : basepath + "/newdataset/"}
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+ valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+ input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+ new_dataset = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.spark.api.python.DoubleArrayWritable",
+ valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
+ conf=input_conf).collect())
+ self.assertEqual(new_dataset, array_data)
+
+ def test_newolderror(self):
+ basepath = self.tempdir.name
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+ basepath + "/newolderror/saveAsHadoopFile/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
+ self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+ basepath + "/newolderror/saveAsNewAPIHadoopFile/",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+
+ def test_bad_inputs(self):
+ basepath = self.tempdir.name
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+ basepath + "/badinputs/saveAsHadoopFile/",
+ "org.apache.hadoop.mapred.NotValidOutputFormat"))
+ self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+ basepath + "/badinputs/saveAsNewAPIHadoopFile/",
+ "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
+
+ def test_converters(self):
+ # use of custom converters
+ basepath = self.tempdir.name
+ data = [(1, {3.0: u'bb'}),
+ (2, {1.0: u'aa'}),
+ (3, {2.0: u'dd'})]
+ self.sc.parallelize(data).saveAsNewAPIHadoopFile(
+ basepath + "/converters/",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
+ valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
+ converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect())
+ expected = [(u'1', 3.0),
+ (u'2', 1.0),
+ (u'3', 2.0)]
+ self.assertEqual(converted, expected)
+
+ def test_reserialization(self):
+ basepath = self.tempdir.name
+ x = range(1, 5)
+ y = range(1001, 1005)
+ data = zip(x, y)
+ rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
+ rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
+ result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
+ self.assertEqual(result1, data)
+
+ rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
+ self.assertEqual(result2, data)
+
+ rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
+ self.assertEqual(result3, data)
+
+ conf4 = {
+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir" : basepath + "/reserialize/dataset"}
+ rdd.saveAsHadoopDataset(conf4)
+ result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
+ self.assertEqual(result4, data)
+
+ conf5 = {"mapreduce.outputformat.class" :
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+ rdd.saveAsNewAPIHadoopDataset(conf5)
+ result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
+ self.assertEqual(result5, data)
+
+ def test_unbatched_save_and_read(self):
+ basepath = self.tempdir.name
+ ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+ self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
+ basepath + "/unbatched/")
+
+ unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_sequence, ei)
+
+ unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_hadoopFile, ei)
+
+ unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ batchSize=1).collect())
+ self.assertEqual(unbatched_newAPIHadoopFile, ei)
+
+ oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
+ "org.apache.hadoop.mapred.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=oldconf,
+ batchSize=1).collect())
+ self.assertEqual(unbatched_hadoopRDD, ei)
+
+ newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
+ "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.Text",
+ conf=newconf,
+ batchSize=1).collect())
+ self.assertEqual(unbatched_newAPIHadoopRDD, ei)
+
+ def test_malformed_RDD(self):
+ basepath = self.tempdir.name
+ # non-batch-serialized RDD[[(K, V)]] should be rejected
+ data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
+ rdd = self.sc.parallelize(data, numSlices=len(data))
+ self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
+ basepath + "/malformed/sequence"))
class TestDaemon(unittest.TestCase):
def connect(self, port):