aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-17 13:36:43 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-17 13:36:43 -0800
commit445a755b884885b88c1778fd56a3151045b0b0ed (patch)
treee36607b0aedc8040fa1946f364ceba85aadbcf68 /python/pyspark/tests.py
parentde4836f8f12c36c1b350cef288a75b5e59155735 (diff)
downloadspark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz
spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2
spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python. Author: Davies Liu <davies@databricks.com> Closes #3027 from davies/progress_api and squashes the following commits: b1ba984 [Davies Liu] fix style d3b9253 [Davies Liu] add tests, mute the exception after stop 4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 25590c9 [Davies Liu] update with Java API 360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py31
1 files changed, 31 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index b5e28c4980..d6afc1cdaa 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1550,6 +1550,37 @@ class ContextTests(unittest.TestCase):
sc.stop()
self.assertEqual(SparkContext._active_spark_context, None)
+ def test_progress_api(self):
+ with SparkContext() as sc:
+ sc.setJobGroup('test_progress_api', '', True)
+
+ rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100))
+ t = threading.Thread(target=rdd.collect)
+ t.daemon = True
+ t.start()
+ # wait for scheduler to start
+ time.sleep(1)
+
+ tracker = sc.statusTracker()
+ jobIds = tracker.getJobIdsForGroup('test_progress_api')
+ self.assertEqual(1, len(jobIds))
+ job = tracker.getJobInfo(jobIds[0])
+ self.assertEqual(1, len(job.stageIds))
+ stage = tracker.getStageInfo(job.stageIds[0])
+ self.assertEqual(rdd.getNumPartitions(), stage.numTasks)
+
+ sc.cancelAllJobs()
+ t.join()
+ # wait for event listener to update the status
+ time.sleep(1)
+
+ job = tracker.getJobInfo(jobIds[0])
+ self.assertEqual('FAILED', job.status)
+ self.assertEqual([], tracker.getActiveJobsIds())
+ self.assertEqual([], tracker.getActiveStageIds())
+
+ sc.stop()
+
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):