diff options
author | Davies Liu <davies@databricks.com> | 2015-02-17 13:36:43 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-17 13:36:43 -0800 |
commit | 445a755b884885b88c1778fd56a3151045b0b0ed (patch) | |
tree | e36607b0aedc8040fa1946f364ceba85aadbcf68 /python/pyspark/tests.py | |
parent | de4836f8f12c36c1b350cef288a75b5e59155735 (diff) | |
download | spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2 spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip |
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python.
Author: Davies Liu <davies@databricks.com>
Closes #3027 from davies/progress_api and squashes the following commits:
b1ba984 [Davies Liu] fix style
d3b9253 [Davies Liu] add tests, mute the exception after stop
4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
25590c9 [Davies Liu] update with Java API
360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index b5e28c4980..d6afc1cdaa 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1550,6 +1550,37 @@ class ContextTests(unittest.TestCase): sc.stop() self.assertEqual(SparkContext._active_spark_context, None) + def test_progress_api(self): + with SparkContext() as sc: + sc.setJobGroup('test_progress_api', '', True) + + rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100)) + t = threading.Thread(target=rdd.collect) + t.daemon = True + t.start() + # wait for scheduler to start + time.sleep(1) + + tracker = sc.statusTracker() + jobIds = tracker.getJobIdsForGroup('test_progress_api') + self.assertEqual(1, len(jobIds)) + job = tracker.getJobInfo(jobIds[0]) + self.assertEqual(1, len(job.stageIds)) + stage = tracker.getStageInfo(job.stageIds[0]) + self.assertEqual(rdd.getNumPartitions(), stage.numTasks) + + sc.cancelAllJobs() + t.join() + # wait for event listener to update the status + time.sleep(1) + + job = tracker.getJobInfo(jobIds[0]) + self.assertEqual('FAILED', job.status) + self.assertEqual([], tracker.getActiveJobsIds()) + self.assertEqual([], tracker.getActiveStageIds()) + + sc.stop() + @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): |