aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/status.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-17 13:36:43 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-17 13:36:43 -0800
commit445a755b884885b88c1778fd56a3151045b0b0ed (patch)
treee36607b0aedc8040fa1946f364ceba85aadbcf68 /python/pyspark/status.py
parentde4836f8f12c36c1b350cef288a75b5e59155735 (diff)
downloadspark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.gz
spark-445a755b884885b88c1778fd56a3151045b0b0ed.tar.bz2
spark-445a755b884885b88c1778fd56a3151045b0b0ed.zip
[SPARK-4172] [PySpark] Progress API in Python
This patch bring the pull based progress API into Python, also a example in Python. Author: Davies Liu <davies@databricks.com> Closes #3027 from davies/progress_api and squashes the following commits: b1ba984 [Davies Liu] fix style d3b9253 [Davies Liu] add tests, mute the exception after stop 4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 25590c9 [Davies Liu] update with Java API 360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 023afb3 [Davies Liu] add Python API and example for progress API
Diffstat (limited to 'python/pyspark/status.py')
-rw-r--r--python/pyspark/status.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/python/pyspark/status.py b/python/pyspark/status.py
new file mode 100644
index 0000000000..a6fa7dd314
--- /dev/null
+++ b/python/pyspark/status.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from collections import namedtuple
+
+__all__ = ["SparkJobInfo", "SparkStageInfo", "StatusTracker"]
+
+
+class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
+ """
+ Exposes information about Spark Jobs.
+ """
+
+
+class SparkStageInfo(namedtuple("SparkStageInfo",
+ "stageId currentAttemptId name numTasks numActiveTasks "
+ "numCompletedTasks numFailedTasks")):
+ """
+ Exposes information about Spark Stages.
+ """
+
+
+class StatusTracker(object):
+ """
+ Low-level status reporting APIs for monitoring job and stage progress.
+
+ These APIs intentionally provide very weak consistency semantics;
+ consumers of these APIs should be prepared to handle empty / missing
+ information. For example, a job's stage ids may be known but the status
+ API may not have any information about the details of those stages, so
+ `getStageInfo` could potentially return `None` for a valid stage id.
+
+ To limit memory usage, these APIs only provide information on recent
+ jobs / stages. These APIs will provide information for the last
+ `spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs.
+ """
+ def __init__(self, jtracker):
+ self._jtracker = jtracker
+
+ def getJobIdsForGroup(self, jobGroup=None):
+ """
+ Return a list of all known jobs in a particular job group. If
+ `jobGroup` is None, then returns all known jobs that are not
+ associated with a job group.
+
+ The returned list may contain running, failed, and completed jobs,
+ and may vary across invocations of this method. This method does
+ not guarantee the order of the elements in its result.
+ """
+ return list(self._jtracker.getJobIdsForGroup(jobGroup))
+
+ def getActiveStageIds(self):
+ """
+ Returns an array containing the ids of all active stages.
+ """
+ return sorted(list(self._jtracker.getActiveStageIds()))
+
+ def getActiveJobsIds(self):
+ """
+ Returns an array containing the ids of all active jobs.
+ """
+ return sorted((list(self._jtracker.getActiveJobIds())))
+
+ def getJobInfo(self, jobId):
+ """
+ Returns a :class:`SparkJobInfo` object, or None if the job info
+ could not be found or was garbage collected.
+ """
+ job = self._jtracker.getJobInfo(jobId)
+ if job is not None:
+ return SparkJobInfo(jobId, job.stageIds(), str(job.status()))
+
+ def getStageInfo(self, stageId):
+ """
+ Returns a :class:`SparkStageInfo` object, or None if the stage
+ info could not be found or was garbage collected.
+ """
+ stage = self._jtracker.getStageInfo(stageId)
+ if stage is not None:
+ # TODO: fetch them in batch for better performance
+ attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]]
+ return SparkStageInfo(stageId, *attrs)