diff options
author | Davies Liu <davies@databricks.com> | 2015-02-17 13:36:43 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-17 13:36:43 -0800 |
commit | 445a755b884885b88c1778fd56a3151045b0b0ed (patch) | |
tree | e36607b0aedc8040fa1946f364ceba85aadbcf68 /examples/src/main/python | |
parent | de4836f8f12c36c1b350cef288a75b5e59155735 (diff) | |
download | spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2 spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip |
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python.
Author: Davies Liu <davies@databricks.com>
Closes #3027 from davies/progress_api and squashes the following commits:
b1ba984 [Davies Liu] fix style
d3b9253 [Davies Liu] add tests, mute the exception after stop
4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
25590c9 [Davies Liu] update with Java API
360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'examples/src/main/python')
-rw-r--r-- | examples/src/main/python/status_api_demo.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py new file mode 100644 index 0000000000..a33bdc475a --- /dev/null +++ b/examples/src/main/python/status_api_demo.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time +import threading +import Queue + +from pyspark import SparkConf, SparkContext + + +def delayed(seconds): + def f(x): + time.sleep(seconds) + return x + return f + + +def call_in_background(f, *args): + result = Queue.Queue(1) + t = threading.Thread(target=lambda: result.put(f(*args))) + t.daemon = True + t.start() + return result + + +def main(): + conf = SparkConf().set("spark.ui.showConsoleProgress", "false") + sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf) + + def run(): + rdd = sc.parallelize(range(10), 10).map(delayed(2)) + reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) + return reduced.map(delayed(2)).collect() + + result = call_in_background(run) + status = sc.statusTracker() + while result.empty(): + ids = status.getJobIdsForGroup() + for id in ids: + job = status.getJobInfo(id) + print "Job", id, "status: ", job.status + for sid in job.stageIds: + info = status.getStageInfo(sid) + if info: + print "Stage %d: %d tasks total (%d active, %d complete)" % \ + (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks) + time.sleep(1) + + print "Job results are:", result.get() + sc.stop() + +if __name__ == "__main__": + main() |