aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py24
1 files changed, 19 insertions, 5 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index dfd841b10a..29d6a128f6 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -64,7 +64,7 @@ class TestCheckpoint(PySparkTestCase):
flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
self.assertFalse(flatMappedRDD.isCheckpointed())
- self.assertIsNone(flatMappedRDD.getCheckpointFile())
+ self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
result = flatMappedRDD.collect()
@@ -79,13 +79,13 @@ class TestCheckpoint(PySparkTestCase):
flatMappedRDD = parCollection.flatMap(lambda x: [x])
self.assertFalse(flatMappedRDD.isCheckpointed())
- self.assertIsNone(flatMappedRDD.getCheckpointFile())
+ self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
flatMappedRDD.count() # forces a checkpoint to be computed
time.sleep(1) # 1 second
- self.assertIsNotNone(flatMappedRDD.getCheckpointFile())
+ self.assertTrue(flatMappedRDD.getCheckpointFile() is not None)
recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile())
self.assertEquals([1, 2, 3, 4], recovered.collect())
@@ -125,6 +125,17 @@ class TestAddFile(PySparkTestCase):
from userlibrary import UserClass
self.assertEqual("Hello World!", UserClass().hello())
+ def test_add_egg_file_locally(self):
+ # To ensure that we're actually testing addPyFile's effects, check that
+ # this fails due to `userlibrary` not being on the Python path:
+ def func():
+ from userlib import UserClass
+ self.assertRaises(ImportError, func)
+ path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg")
+ self.sc.addPyFile(path)
+ from userlib import UserClass
+ self.assertEqual("Hello World from inside a package!", UserClass().hello())
+
class TestIO(PySparkTestCase):
@@ -164,9 +175,12 @@ class TestDaemon(unittest.TestCase):
time.sleep(1)
# daemon should no longer accept connections
- with self.assertRaises(EnvironmentError) as trap:
+ try:
self.connect(port)
- self.assertEqual(trap.exception.errno, ECONNREFUSED)
+ except EnvironmentError as exception:
+ self.assertEqual(exception.errno, ECONNREFUSED)
+ else:
+ self.fail("Expected EnvironmentError to be raised")
def test_termination_stdin(self):
"""Ensure that daemon and workers terminate when stdin is closed."""