aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index c15bb45775..9c5ecd0bb0 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -52,12 +52,13 @@ class PySparkTestCase(unittest.TestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
- self.sc = SparkContext('local[4]', class_name , batchSize=2)
+ self.sc = SparkContext('local[4]', class_name, batchSize=2)
def tearDown(self):
self.sc.stop()
sys.path = self._old_sys_path
+
class TestCheckpoint(PySparkTestCase):
def setUp(self):
@@ -190,6 +191,7 @@ class TestRDDFunctions(PySparkTestCase):
def testAggregateByKey(self):
data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2)
+
def seqOp(x, y):
x.add(y)
return x
@@ -197,17 +199,19 @@ class TestRDDFunctions(PySparkTestCase):
def combOp(x, y):
x |= y
return x
-
+
sets = dict(data.aggregateByKey(set(), seqOp, combOp).collect())
self.assertEqual(3, len(sets))
self.assertEqual(set([1]), sets[1])
self.assertEqual(set([2]), sets[3])
self.assertEqual(set([1, 3]), sets[5])
+
class TestIO(PySparkTestCase):
def test_stdout_redirection(self):
import subprocess
+
def func(x):
subprocess.check_call('ls', shell=True)
self.sc.parallelize([1]).foreach(func)
@@ -479,7 +483,7 @@ class TestSparkSubmit(unittest.TestCase):
| return x + 1
""")
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)