aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py51
1 files changed, 51 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 16fb5a9256..acc3c30371 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -790,6 +790,57 @@ class TestDaemon(unittest.TestCase):
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+class TestWorker(PySparkTestCase):
+ def test_cancel_task(self):
+ temp = tempfile.NamedTemporaryFile(delete=True)
+ temp.close()
+ path = temp.name
+ def sleep(x):
+ import os, time
+ with open(path, 'w') as f:
+ f.write("%d %d" % (os.getppid(), os.getpid()))
+ time.sleep(100)
+
+ # start job in background thread
+ def run():
+ self.sc.parallelize(range(1)).foreach(sleep)
+ import threading
+ t = threading.Thread(target=run)
+ t.daemon = True
+ t.start()
+
+ daemon_pid, worker_pid = 0, 0
+ while True:
+ if os.path.exists(path):
+ data = open(path).read().split(' ')
+ daemon_pid, worker_pid = map(int, data)
+ break
+ time.sleep(0.1)
+
+ # cancel jobs
+ self.sc.cancelAllJobs()
+ t.join()
+
+ for i in range(50):
+ try:
+ os.kill(worker_pid, 0)
+ time.sleep(0.1)
+ except OSError:
+ break # worker was killed
+ else:
+ self.fail("worker has not been killed after 5 seconds")
+
+ try:
+ os.kill(daemon_pid, 0)
+ except OSError:
+ self.fail("daemon had been killed")
+
+ def test_fd_leak(self):
+ N = 1100 # fd limit is 1024 by default
+ rdd = self.sc.parallelize(range(N), N)
+ self.assertEquals(N, rdd.count())
+
+
class TestSparkSubmit(unittest.TestCase):
def setUp(self):
self.programDir = tempfile.mkdtemp()