diff options
author | Davies Liu <davies@databricks.com> | 2015-02-17 13:36:43 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-17 13:36:43 -0800 |
commit | 445a755b884885b88c1778fd56a3151045b0b0ed (patch) | |
tree | e36607b0aedc8040fa1946f364ceba85aadbcf68 /python/pyspark/status.py | |
parent | de4836f8f12c36c1b350cef288a75b5e59155735 (diff) | |
download | spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2 spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip |
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python.
Author: Davies Liu <davies@databricks.com>
Closes #3027 from davies/progress_api and squashes the following commits:
b1ba984 [Davies Liu] fix style
d3b9253 [Davies Liu] add tests, mute the exception after stop
4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
25590c9 [Davies Liu] update with Java API
360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api
023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'python/pyspark/status.py')
-rw-r--r-- | python/pyspark/status.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/python/pyspark/status.py b/python/pyspark/status.py new file mode 100644 index 0000000000..a6fa7dd314 --- /dev/null +++ b/python/pyspark/status.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from collections import namedtuple + +__all__ = ["SparkJobInfo", "SparkStageInfo", "StatusTracker"] + + +class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")): + """ + Exposes information about Spark Jobs. + """ + + +class SparkStageInfo(namedtuple("SparkStageInfo", + "stageId currentAttemptId name numTasks numActiveTasks " + "numCompletedTasks numFailedTasks")): + """ + Exposes information about Spark Stages. + """ + + +class StatusTracker(object): + """ + Low-level status reporting APIs for monitoring job and stage progress. + + These APIs intentionally provide very weak consistency semantics; + consumers of these APIs should be prepared to handle empty / missing + information. For example, a job's stage ids may be known but the status + API may not have any information about the details of those stages, so + `getStageInfo` could potentially return `None` for a valid stage id. + + To limit memory usage, these APIs only provide information on recent + jobs / stages. These APIs will provide information for the last + `spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs. + """ + def __init__(self, jtracker): + self._jtracker = jtracker + + def getJobIdsForGroup(self, jobGroup=None): + """ + Return a list of all known jobs in a particular job group. If + `jobGroup` is None, then returns all known jobs that are not + associated with a job group. + + The returned list may contain running, failed, and completed jobs, + and may vary across invocations of this method. This method does + not guarantee the order of the elements in its result. + """ + return list(self._jtracker.getJobIdsForGroup(jobGroup)) + + def getActiveStageIds(self): + """ + Returns an array containing the ids of all active stages. + """ + return sorted(list(self._jtracker.getActiveStageIds())) + + def getActiveJobsIds(self): + """ + Returns an array containing the ids of all active jobs. + """ + return sorted((list(self._jtracker.getActiveJobIds()))) + + def getJobInfo(self, jobId): + """ + Returns a :class:`SparkJobInfo` object, or None if the job info + could not be found or was garbage collected. + """ + job = self._jtracker.getJobInfo(jobId) + if job is not None: + return SparkJobInfo(jobId, job.stageIds(), str(job.status())) + + def getStageInfo(self, stageId): + """ + Returns a :class:`SparkStageInfo` object, or None if the stage + info could not be found or was garbage collected. + """ + stage = self._jtracker.getStageInfo(stageId) + if stage is not None: + # TODO: fetch them in batch for better performance + attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]] + return SparkStageInfo(stageId, *attrs) |