From 40eb8b6ef3a67e36d0d9492c044981a1da76351d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 14 Nov 2014 23:46:25 -0800 Subject: [SPARK-2321] Several progress API improvements / refactorings This PR refactors / extends the status API introduced in #2696. - Change StatusAPI from a mixin trait to a class. Before, the new status API methods were directly accessible through SparkContext, whereas now they're accessed through a `sc.statusAPI` field. As long as we were going to add these methods directly to SparkContext, the mixin trait seemed like a good idea, but this might be simpler to reason about and may avoid pitfalls that I've run into while attempting to refactor other parts of SparkContext to use mixins (see #3071, for example). - Change the name from SparkStatusAPI to SparkStatusTracker. - Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated with any job group. - Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids of whatever's currently active in this SparkContext. This should simplify davies's progress bar code. Author: Josh Rosen Closes #3197 from JoshRosen/progress-api-improvements and squashes the following commits: 30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker. d1b08d8 [Josh Rosen] Add missing newlines 2cc7353 [Josh Rosen] Add missing file. d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods. a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default group c47e294 [Josh Rosen] Remove StatusAPI mixin trait. --- .../apache/spark/examples/JavaStatusAPIDemo.java | 70 ---------------------- .../spark/examples/JavaStatusTrackerDemo.java | 70 ++++++++++++++++++++++ 2 files changed, 70 insertions(+), 70 deletions(-) delete mode 100644 examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java create mode 100644 examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java (limited to 'examples/src') diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java deleted file mode 100644 index 430e96ab14..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples; - -import org.apache.spark.SparkConf; -import org.apache.spark.SparkJobInfo; -import org.apache.spark.SparkStageInfo; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -import java.util.Arrays; -import java.util.List; - -/** - * Example of using Spark's status APIs from Java. - */ -public final class JavaStatusAPIDemo { - - public static final String APP_NAME = "JavaStatusAPIDemo"; - - public static final class IdentityWithDelay implements Function { - @Override - public T call(T x) throws Exception { - Thread.sleep(2 * 1000); // 2 seconds - return x; - } - } - - public static void main(String[] args) throws Exception { - SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); - final JavaSparkContext sc = new JavaSparkContext(sparkConf); - - // Example of implementing a progress reporter for a simple job. - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( - new IdentityWithDelay()); - JavaFutureAction> jobFuture = rdd.collectAsync(); - while (!jobFuture.isDone()) { - Thread.sleep(1000); // 1 second - List jobIds = jobFuture.jobIds(); - if (jobIds.isEmpty()) { - continue; - } - int currentJobId = jobIds.get(jobIds.size() - 1); - SparkJobInfo jobInfo = sc.getJobInfo(currentJobId); - SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); - System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + - " active, " + stageInfo.numCompletedTasks() + " complete"); - } - - System.out.println("Job results are: " + jobFuture.get()); - sc.stop(); - } -} diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java new file mode 100644 index 0000000000..e68ec74c3e --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.Arrays; +import java.util.List; + +/** + * Example of using Spark's status APIs from Java. + */ +public final class JavaStatusTrackerDemo { + + public static final String APP_NAME = "JavaStatusAPIDemo"; + + public static final class IdentityWithDelay implements Function { + @Override + public T call(T x) throws Exception { + Thread.sleep(2 * 1000); // 2 seconds + return x; + } + } + + public static void main(String[] args) throws Exception { + SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); + final JavaSparkContext sc = new JavaSparkContext(sparkConf); + + // Example of implementing a progress reporter for a simple job. + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( + new IdentityWithDelay()); + JavaFutureAction> jobFuture = rdd.collectAsync(); + while (!jobFuture.isDone()) { + Thread.sleep(1000); // 1 second + List jobIds = jobFuture.jobIds(); + if (jobIds.isEmpty()) { + continue; + } + int currentJobId = jobIds.get(jobIds.size() - 1); + SparkJobInfo jobInfo = sc.statusTracker().getJobInfo(currentJobId); + SparkStageInfo stageInfo = sc.statusTracker().getStageInfo(jobInfo.stageIds()[0]); + System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + + " active, " + stageInfo.numCompletedTasks() + " complete"); + } + + System.out.println("Job results are: " + jobFuture.get()); + sc.stop(); + } +} -- cgit v1.2.3