aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-10-25 00:06:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-25 00:06:57 -0700
commit9530316887612dca060a128fca34dd5a6ab2a9a9 (patch)
treec88a0a75186a5f72676f28e131368f62de8f99b6 /examples/src/main
parent3a845d3c048eebb0bddb3937128746fde3e8e4d8 (diff)
downloadspark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.gz
spark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.bz2
spark-9530316887612dca060a128fca34dd5a6ab2a9a9.zip
[SPARK-2321] Stable pull-based progress / status API
This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)). For now, I'd like to discuss the basic implementation, API names, and overall interface design. Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API. #### Design goals: - Pull-based API - Usable from Java / Scala / Python (eventually, likely with a wrapper) - Can be extended to expose more information without introducing binary incompatibilities. - Returns immutable objects. - Don't leak any implementation details, preserving our freedom to change the implementation. #### Implementation: - Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved. - Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values. These interfaces consist entirely of Java-style getter methods. The interfaces are currently implemented in Java. I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves. -Allow an existing JobProgressListener to be used when constructing a live SparkUI. This allows us to re-use this listeners in the implementation of this status API. There are a few reasons why this listener re-use makes sense: - The status API and web UI are guaranteed to show consistent information. - These listeners are already well-tested. - The same garbage-collection / information retention configurations can apply to both this API and the web UI. - Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings. The progress API methods are implemented in a separate trait that's mixed into SparkContext. This helps to avoid SparkContext.scala from becoming larger and more difficult to read. Author: Josh Rosen <joshrosen@databricks.com> Author: Josh Rosen <joshrosen@apache.org> Closes #2696 from JoshRosen/progress-reporting-api and squashes the following commits: e6aa78d [Josh Rosen] Add tests. b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses. c96402d [Josh Rosen] Address review comments. 2707f98 [Josh Rosen] Expose current stage attempt id c28ba76 [Josh Rosen] Update demo code: 646ff1d [Josh Rosen] Document spark.ui.retainedJobs. 7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback. b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api 787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext. f9a9a00 [Josh Rosen] More review comments: 3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext. 249ca16 [Josh Rosen] Address several review comments: da5648e [Josh Rosen] Add example of basic progress reporting in Java. 7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods. cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark. 6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics: 08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API. ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener 24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:
Diffstat (limited to 'examples/src/main')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java70
1 files changed, 70 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
new file mode 100644
index 0000000000..430e96ab14
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Example of using Spark's status APIs from Java.
+ */
+public final class JavaStatusAPIDemo {
+
+ public static final String APP_NAME = "JavaStatusAPIDemo";
+
+ public static final class IdentityWithDelay<T> implements Function<T, T> {
+ @Override
+ public T call(T x) throws Exception {
+ Thread.sleep(2 * 1000); // 2 seconds
+ return x;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ SparkConf sparkConf = new SparkConf().setAppName(APP_NAME);
+ final JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+ // Example of implementing a progress reporter for a simple job.
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
+ new IdentityWithDelay<Integer>());
+ JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
+ while (!jobFuture.isDone()) {
+ Thread.sleep(1000); // 1 second
+ List<Integer> jobIds = jobFuture.jobIds();
+ if (jobIds.isEmpty()) {
+ continue;
+ }
+ int currentJobId = jobIds.get(jobIds.size() - 1);
+ SparkJobInfo jobInfo = sc.getJobInfo(currentJobId);
+ SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]);
+ System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
+ " active, " + stageInfo.numCompletedTasks() + " complete");
+ }
+
+ System.out.println("Job results are: " + jobFuture.get());
+ sc.stop();
+ }
+}