aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-08-06 12:58:24 -0700
committerReynold Xin <rxin@apache.org>2014-08-06 12:58:24 -0700
commitd614967b0bad1e6c5277d612602ec0a653a00258 (patch)
tree8df1a52cbe074af4f928c0ac8f08a63075882d0b /python/pyspark/tests.py
parenta6cd31108f0d73ce6823daafe8447677e03cfd13 (diff)
downloadspark-d614967b0bad1e6c5277d612602ec0a653a00258.tar.gz
spark-d614967b0bad1e6c5277d612602ec0a653a00258.tar.bz2
spark-d614967b0bad1e6c5277d612602ec0a653a00258.zip
[SPARK-2627] [PySpark] have the build enforce PEP 8 automatically
As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that. Notes: * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server. * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request. * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete. * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1744 from nchammas/master and squashes the following commits: 274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes 983d963 [nchammas] Merge pull request #5 from apache/master 1db5314 [nchammas] Merge pull request #4 from apache/master 0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing 6db9a44 [nchammas] Merge pull request #3 from apache/master 7b4750e [Nicholas Chammas] merge upstream changes 91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks 44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes 9da347f [nchammas] Merge pull request #2 from apache/master aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections 21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8 6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes fe57ed0 [Nicholas Chammas] removing merge conflict backups 9c01d4c [nchammas] Merge pull request #1 from apache/master 9a66cb0 [Nicholas Chammas] resolving merge conflicts a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status 723ed39 [Nicholas Chammas] always delete the report file 0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests 12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter 61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter 75ad552 [Nicholas Chammas] make check output style consistent
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py143
1 files changed, 81 insertions, 62 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 4ac94ba729..88a61176e5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,53 +62,53 @@ class TestMerger(unittest.TestCase):
self.N = 1 << 16
self.l = [i for i in xrange(self.N)]
self.data = zip(self.l, self.l)
- self.agg = Aggregator(lambda x: [x],
- lambda x, y: x.append(y) or x,
- lambda x, y: x.extend(y) or x)
+ self.agg = Aggregator(lambda x: [x],
+ lambda x, y: x.append(y) or x,
+ lambda x, y: x.extend(y) or x)
def test_in_memory(self):
m = InMemoryMerger(self.agg)
m.mergeValues(self.data)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
m = InMemoryMerger(self.agg)
m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
def test_small_dataset(self):
m = ExternalMerger(self.agg, 1000)
m.mergeValues(self.data)
self.assertEqual(m.spills, 0)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
m = ExternalMerger(self.agg, 1000)
m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
self.assertEqual(m.spills, 0)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
def test_medium_dataset(self):
m = ExternalMerger(self.agg, 10)
m.mergeValues(self.data)
self.assertTrue(m.spills >= 1)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)))
+ sum(xrange(self.N)))
m = ExternalMerger(self.agg, 10)
m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3))
self.assertTrue(m.spills >= 1)
self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
- sum(xrange(self.N)) * 3)
+ sum(xrange(self.N)) * 3)
def test_huge_dataset(self):
m = ExternalMerger(self.agg, 10)
m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10))
self.assertTrue(m.spills >= 1)
self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)),
- self.N * 10)
+ self.N * 10)
m._cleanup()
@@ -188,6 +188,7 @@ class TestAddFile(PySparkTestCase):
log4j = self.sc._jvm.org.apache.log4j
old_level = log4j.LogManager.getRootLogger().getLevel()
log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
+
def func(x):
from userlibrary import UserClass
return UserClass().hello()
@@ -355,8 +356,8 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(doubles, ed)
bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.BytesWritable").collect())
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.BytesWritable").collect())
ebs = [(1, bytearray('aa', 'utf-8')),
(1, bytearray('aa', 'utf-8')),
(2, bytearray('aa', 'utf-8')),
@@ -428,9 +429,9 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(clazz[0], ec)
unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
- "org.apache.hadoop.io.Text",
- "org.apache.spark.api.python.TestWritable",
- batchSize=1).collect())
+ "org.apache.hadoop.io.Text",
+ "org.apache.spark.api.python.TestWritable",
+ batchSize=1).collect())
self.assertEqual(unbatched_clazz[0], ec)
def test_oldhadoop(self):
@@ -443,7 +444,7 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- oldconf = {"mapred.input.dir" : hellopath}
+ oldconf = {"mapred.input.dir": hellopath}
hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
@@ -462,7 +463,7 @@ class TestInputFormat(PySparkTestCase):
self.assertEqual(ints, ei)
hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
- newconf = {"mapred.input.dir" : hellopath}
+ newconf = {"mapred.input.dir": hellopath}
hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
@@ -517,6 +518,7 @@ class TestInputFormat(PySparkTestCase):
(u'\x03', [2.0])]
self.assertEqual(maps, em)
+
class TestOutputFormat(PySparkTestCase):
def setUp(self):
@@ -574,8 +576,8 @@ class TestOutputFormat(PySparkTestCase):
def test_oldhadoop(self):
basepath = self.tempdir.name
dict_data = [(1, {}),
- (1, {"row1" : 1.0}),
- (2, {"row2" : 2.0})]
+ (1, {"row1": 1.0}),
+ (2, {"row2": 2.0})]
self.sc.parallelize(dict_data).saveAsHadoopFile(
basepath + "/oldhadoop/",
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
@@ -589,12 +591,13 @@ class TestOutputFormat(PySparkTestCase):
self.assertEqual(result, dict_data)
conf = {
- "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
- "mapred.output.dir" : basepath + "/olddataset/"}
+ "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.MapWritable",
+ "mapred.output.dir": basepath + "/olddataset/"
+ }
self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
- input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+ input_conf = {"mapred.input.dir": basepath + "/olddataset/"}
old_dataset = sorted(self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -622,14 +625,17 @@ class TestOutputFormat(PySparkTestCase):
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
self.assertEqual(result, array_data)
- conf = {"mapreduce.outputformat.class" :
- "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
- "mapred.output.dir" : basepath + "/newdataset/"}
- self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+ conf = {
+ "mapreduce.outputformat.class":
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable",
+ "mapred.output.dir": basepath + "/newdataset/"
+ }
+ self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(
+ conf,
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
- input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+ input_conf = {"mapred.input.dir": basepath + "/newdataset/"}
new_dataset = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -640,7 +646,7 @@ class TestOutputFormat(PySparkTestCase):
def test_newolderror(self):
basepath = self.tempdir.name
- rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
basepath + "/newolderror/saveAsHadoopFile/",
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
@@ -650,7 +656,7 @@ class TestOutputFormat(PySparkTestCase):
def test_bad_inputs(self):
basepath = self.tempdir.name
- rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+ rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
basepath + "/badinputs/saveAsHadoopFile/",
"org.apache.hadoop.mapred.NotValidOutputFormat"))
@@ -685,30 +691,32 @@ class TestOutputFormat(PySparkTestCase):
result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
self.assertEqual(result1, data)
- rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
- "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+ rdd.saveAsHadoopFile(
+ basepath + "/reserialize/hadoop",
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat")
result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
self.assertEqual(result2, data)
- rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
- "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+ rdd.saveAsNewAPIHadoopFile(
+ basepath + "/reserialize/newhadoop",
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
self.assertEqual(result3, data)
conf4 = {
- "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.dir" : basepath + "/reserialize/dataset"}
+ "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir": basepath + "/reserialize/dataset"}
rdd.saveAsHadoopDataset(conf4)
result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
self.assertEqual(result4, data)
- conf5 = {"mapreduce.outputformat.class" :
- "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
- "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
- "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+ conf5 = {"mapreduce.outputformat.class":
+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+ "mapred.output.key.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.value.class": "org.apache.hadoop.io.IntWritable",
+ "mapred.output.dir": basepath + "/reserialize/newdataset"}
rdd.saveAsNewAPIHadoopDataset(conf5)
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
@@ -719,25 +727,28 @@ class TestOutputFormat(PySparkTestCase):
self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
basepath + "/unbatched/")
- unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
+ unbatched_sequence = sorted(self.sc.sequenceFile(
+ basepath + "/unbatched/",
batchSize=1).collect())
self.assertEqual(unbatched_sequence, ei)
- unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
+ unbatched_hadoopFile = sorted(self.sc.hadoopFile(
+ basepath + "/unbatched/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
self.assertEqual(unbatched_hadoopFile, ei)
- unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
+ unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
+ basepath + "/unbatched/",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
self.assertEqual(unbatched_newAPIHadoopFile, ei)
- oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -746,7 +757,7 @@ class TestOutputFormat(PySparkTestCase):
batchSize=1).collect())
self.assertEqual(unbatched_hadoopRDD, ei)
- newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+ newconf = {"mapred.input.dir": basepath + "/unbatched/"}
unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
@@ -763,7 +774,9 @@ class TestOutputFormat(PySparkTestCase):
self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
basepath + "/malformed/sequence"))
+
class TestDaemon(unittest.TestCase):
+
def connect(self, port):
from socket import socket, AF_INET, SOCK_STREAM
sock = socket(AF_INET, SOCK_STREAM)
@@ -810,12 +823,15 @@ class TestDaemon(unittest.TestCase):
class TestWorker(PySparkTestCase):
+
def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
path = temp.name
+
def sleep(x):
- import os, time
+ import os
+ import time
with open(path, 'w') as f:
f.write("%d %d" % (os.getppid(), os.getpid()))
time.sleep(100)
@@ -845,7 +861,7 @@ class TestWorker(PySparkTestCase):
os.kill(worker_pid, 0)
time.sleep(0.1)
except OSError:
- break # worker was killed
+ break # worker was killed
else:
self.fail("worker has not been killed after 5 seconds")
@@ -855,12 +871,13 @@ class TestWorker(PySparkTestCase):
self.fail("daemon had been killed")
def test_fd_leak(self):
- N = 1100 # fd limit is 1024 by default
+ N = 1100 # fd limit is 1024 by default
rdd = self.sc.parallelize(range(N), N)
self.assertEquals(N, rdd.count())
class TestSparkSubmit(unittest.TestCase):
+
def setUp(self):
self.programDir = tempfile.mkdtemp()
self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
@@ -953,9 +970,9 @@ class TestSparkSubmit(unittest.TestCase):
|def myfunc(x):
| return x + 1
""")
- proc = subprocess.Popen(
- [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script],
- stdout=subprocess.PIPE)
+ proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
+ "local-cluster[1,1,512]", script],
+ stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)
@@ -981,6 +998,7 @@ class TestSparkSubmit(unittest.TestCase):
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
+
"""General PySpark tests that depend on scipy """
def test_serialize(self):
@@ -993,15 +1011,16 @@ class SciPyTests(PySparkTestCase):
@unittest.skipIf(not _have_numpy, "NumPy not installed")
class NumPyTests(PySparkTestCase):
+
"""General PySpark tests that depend on numpy """
def test_statcounter_array(self):
- x = self.sc.parallelize([np.array([1.0,1.0]), np.array([2.0,2.0]), np.array([3.0,3.0])])
+ x = self.sc.parallelize([np.array([1.0, 1.0]), np.array([2.0, 2.0]), np.array([3.0, 3.0])])
s = x.stats()
- self.assertSequenceEqual([2.0,2.0], s.mean().tolist())
- self.assertSequenceEqual([1.0,1.0], s.min().tolist())
- self.assertSequenceEqual([3.0,3.0], s.max().tolist())
- self.assertSequenceEqual([1.0,1.0], s.sampleStdev().tolist())
+ self.assertSequenceEqual([2.0, 2.0], s.mean().tolist())
+ self.assertSequenceEqual([1.0, 1.0], s.min().tolist())
+ self.assertSequenceEqual([3.0, 3.0], s.max().tolist())
+ self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist())
if __name__ == "__main__":