aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index b687d695b0..747cd1767d 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1222,11 +1222,46 @@ class TestWorker(PySparkTestCase):
except OSError:
self.fail("daemon had been killed")
+ # run a normal job
+ rdd = self.sc.parallelize(range(100), 1)
+ self.assertEqual(100, rdd.map(str).count())
+
def test_fd_leak(self):
N = 1100 # fd limit is 1024 by default
rdd = self.sc.parallelize(range(N), N)
self.assertEquals(N, rdd.count())
+ def test_after_exception(self):
+ def raise_exception(_):
+ raise Exception()
+ rdd = self.sc.parallelize(range(100), 1)
+ self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
+ self.assertEqual(100, rdd.map(str).count())
+
+ def test_after_jvm_exception(self):
+ tempFile = tempfile.NamedTemporaryFile(delete=False)
+ tempFile.write("Hello World!")
+ tempFile.close()
+ data = self.sc.textFile(tempFile.name, 1)
+ filtered_data = data.filter(lambda x: True)
+ self.assertEqual(1, filtered_data.count())
+ os.unlink(tempFile.name)
+ self.assertRaises(Exception, lambda: filtered_data.count())
+
+ rdd = self.sc.parallelize(range(100), 1)
+ self.assertEqual(100, rdd.map(str).count())
+
+ def test_accumulator_when_reuse_worker(self):
+ from pyspark.accumulators import INT_ACCUMULATOR_PARAM
+ acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
+ self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x))
+ self.assertEqual(sum(range(100)), acc1.value)
+
+ acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
+ self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x))
+ self.assertEqual(sum(range(100)), acc2.value)
+ self.assertEqual(sum(range(100)), acc1.value)
+
class TestSparkSubmit(unittest.TestCase):