aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-17 13:36:43 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-17 13:36:43 -0800
commit445a755b884885b88c1778fd56a3151045b0b0ed (patch)
treee36607b0aedc8040fa1946f364ceba85aadbcf68 /examples
parentde4836f8f12c36c1b350cef288a75b5e59155735 (diff)
downloadspark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz
spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2
spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python. Author: Davies Liu <davies@databricks.com> Closes #3027 from davies/progress_api and squashes the following commits: b1ba984 [Davies Liu] fix style d3b9253 [Davies Liu] add tests, mute the exception after stop 4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 25590c9 [Davies Liu] update with Java API 360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/python/status_api_demo.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py
new file mode 100644
index 0000000000..a33bdc475a
--- /dev/null
+++ b/examples/src/main/python/status_api_demo.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+import threading
+import Queue
+
+from pyspark import SparkConf, SparkContext
+
+
+def delayed(seconds):
+ def f(x):
+ time.sleep(seconds)
+ return x
+ return f
+
+
+def call_in_background(f, *args):
+ result = Queue.Queue(1)
+ t = threading.Thread(target=lambda: result.put(f(*args)))
+ t.daemon = True
+ t.start()
+ return result
+
+
+def main():
+ conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
+ sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
+
+ def run():
+ rdd = sc.parallelize(range(10), 10).map(delayed(2))
+ reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
+ return reduced.map(delayed(2)).collect()
+
+ result = call_in_background(run)
+ status = sc.statusTracker()
+ while result.empty():
+ ids = status.getJobIdsForGroup()
+ for id in ids:
+ job = status.getJobInfo(id)
+ print "Job", id, "status: ", job.status
+ for sid in job.stageIds:
+ info = status.getStageInfo(sid)
+ if info:
+ print "Stage %d: %d tasks total (%d active, %d complete)" % \
+ (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)
+ time.sleep(1)
+
+ print "Job results are:", result.get()
+ sc.stop()
+
+if __name__ == "__main__":
+ main()