diff options
author | Josh Rosen <joshrosen@databricks.com> | 2014-10-25 00:06:57 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-25 00:06:57 -0700 |
commit | 9530316887612dca060a128fca34dd5a6ab2a9a9 (patch) | |
tree | c88a0a75186a5f72676f28e131368f62de8f99b6 /examples/src | |
parent | 3a845d3c048eebb0bddb3937128746fde3e8e4d8 (diff) | |
download | spark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.gz spark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.bz2 spark-9530316887612dca060a128fca34dd5a6ab2a9a9.zip |
[SPARK-2321] Stable pull-based progress / status API
This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)). For now, I'd like to discuss the basic implementation, API names, and overall interface design. Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API.
#### Design goals:
- Pull-based API
- Usable from Java / Scala / Python (eventually, likely with a wrapper)
- Can be extended to expose more information without introducing binary incompatibilities.
- Returns immutable objects.
- Don't leak any implementation details, preserving our freedom to change the implementation.
#### Implementation:
- Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved.
- Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values. These interfaces consist entirely of Java-style getter methods. The interfaces are currently implemented in Java. I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves.
-Allow an existing JobProgressListener to be used when constructing a live SparkUI. This allows us to re-use this listeners in the implementation of this status API. There are a few reasons why this listener re-use makes sense:
- The status API and web UI are guaranteed to show consistent information.
- These listeners are already well-tested.
- The same garbage-collection / information retention configurations can apply to both this API and the web UI.
- Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings.
The progress API methods are implemented in a separate trait that's mixed into SparkContext. This helps to avoid SparkContext.scala from becoming larger and more difficult to read.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Josh Rosen <joshrosen@apache.org>
Closes #2696 from JoshRosen/progress-reporting-api and squashes the following commits:
e6aa78d [Josh Rosen] Add tests.
b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses.
c96402d [Josh Rosen] Address review comments.
2707f98 [Josh Rosen] Expose current stage attempt id
c28ba76 [Josh Rosen] Update demo code:
646ff1d [Josh Rosen] Document spark.ui.retainedJobs.
7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback.
b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api
787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext.
f9a9a00 [Josh Rosen] More review comments:
3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext.
249ca16 [Josh Rosen] Address several review comments:
da5648e [Josh Rosen] Add example of basic progress reporting in Java.
7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods.
cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark.
6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics:
08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API.
ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener
24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java new file mode 100644 index 0000000000..430e96ab14 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.Arrays; +import java.util.List; + +/** + * Example of using Spark's status APIs from Java. + */ +public final class JavaStatusAPIDemo { + + public static final String APP_NAME = "JavaStatusAPIDemo"; + + public static final class IdentityWithDelay<T> implements Function<T, T> { + @Override + public T call(T x) throws Exception { + Thread.sleep(2 * 1000); // 2 seconds + return x; + } + } + + public static void main(String[] args) throws Exception { + SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); + final JavaSparkContext sc = new JavaSparkContext(sparkConf); + + // Example of implementing a progress reporter for a simple job. + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( + new IdentityWithDelay<Integer>()); + JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync(); + while (!jobFuture.isDone()) { + Thread.sleep(1000); // 1 second + List<Integer> jobIds = jobFuture.jobIds(); + if (jobIds.isEmpty()) { + continue; + } + int currentJobId = jobIds.get(jobIds.size() - 1); + SparkJobInfo jobInfo = sc.getJobInfo(currentJobId); + SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); + System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + + " active, " + stageInfo.numCompletedTasks() + " complete"); + } + + System.out.println("Job results are: " + jobFuture.get()); + sc.stop(); + } +} |