aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-10-16 21:38:45 -0400
committerPatrick Wendell <pwendell@gmail.com>2014-10-16 21:38:45 -0400
commit2fe0ba95616bb3860736b6b426635a5d2a0e9bd9 (patch)
tree80c42707128cf5f7ec50e63bcd24d5c943d66049 /core/src/main/java/org
parent99e416b6d64402a5432a265797a1c155a38f4e6f (diff)
downloadspark-2fe0ba95616bb3860736b6b426635a5d2a0e9bd9.tar.gz
spark-2fe0ba95616bb3860736b6b426635a5d2a0e9bd9.tar.bz2
spark-2fe0ba95616bb3860736b6b426635a5d2a0e9bd9.zip
SPARK-3874: Provide stable TaskContext API
This is a small number of clean-up changes on top of #2782. Closes #2782. Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #2803 from pwendell/pr-2782 and squashes the following commits: 56d5b7a [Patrick Wendell] Minor clean-up 44089ec [Patrick Wendell] Clean-up the TaskContext API. ed551ce [Prashant Sharma] Fixed a typo df261d0 [Prashant Sharma] Josh's suggestion facf3b1 [Prashant Sharma] Fixed the mima issue. 7ecc2fe [Prashant Sharma] CR, Moved implementations to TaskContextImpl bbd9e05 [Prashant Sharma] adding missed out files to git. ef633f5 [Prashant Sharma] SPARK-3874, Provide stable TaskContext API
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/apache/spark/TaskContext.java225
1 files changed, 32 insertions, 193 deletions
diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java
index 4e6d708af0..2d998d4c7a 100644
--- a/core/src/main/java/org/apache/spark/TaskContext.java
+++ b/core/src/main/java/org/apache/spark/TaskContext.java
@@ -18,131 +18,55 @@
package org.apache.spark;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import scala.Function0;
import scala.Function1;
import scala.Unit;
-import scala.collection.JavaConversions;
import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.util.TaskCompletionListener;
-import org.apache.spark.util.TaskCompletionListenerException;
/**
-* :: DeveloperApi ::
-* Contextual information about a task which can be read or mutated during execution.
-*/
-@DeveloperApi
-public class TaskContext implements Serializable {
-
- private int stageId;
- private int partitionId;
- private long attemptId;
- private boolean runningLocally;
- private TaskMetrics taskMetrics;
-
- /**
- * :: DeveloperApi ::
- * Contextual information about a task which can be read or mutated during execution.
- *
- * @param stageId stage id
- * @param partitionId index of the partition
- * @param attemptId the number of attempts to execute this task
- * @param runningLocally whether the task is running locally in the driver JVM
- * @param taskMetrics performance metrics of the task
- */
- @DeveloperApi
- public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally,
- TaskMetrics taskMetrics) {
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.runningLocally = runningLocally;
- this.stageId = stageId;
- this.taskMetrics = taskMetrics;
- }
-
- /**
- * :: DeveloperApi ::
- * Contextual information about a task which can be read or mutated during execution.
- *
- * @param stageId stage id
- * @param partitionId index of the partition
- * @param attemptId the number of attempts to execute this task
- * @param runningLocally whether the task is running locally in the driver JVM
- */
- @DeveloperApi
- public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) {
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.runningLocally = runningLocally;
- this.stageId = stageId;
- this.taskMetrics = TaskMetrics.empty();
- }
-
+ * Contextual information about a task which can be read or mutated during
+ * execution. To access the TaskContext for a running task use
+ * TaskContext.get().
+ */
+public abstract class TaskContext implements Serializable {
/**
- * :: DeveloperApi ::
- * Contextual information about a task which can be read or mutated during execution.
- *
- * @param stageId stage id
- * @param partitionId index of the partition
- * @param attemptId the number of attempts to execute this task
+ * Return the currently active TaskContext. This can be called inside of
+ * user functions to access contextual information about running tasks.
*/
- @DeveloperApi
- public TaskContext(int stageId, int partitionId, long attemptId) {
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.runningLocally = false;
- this.stageId = stageId;
- this.taskMetrics = TaskMetrics.empty();
+ public static TaskContext get() {
+ return taskContext.get();
}
private static ThreadLocal<TaskContext> taskContext =
new ThreadLocal<TaskContext>();
- /**
- * :: Internal API ::
- * This is spark internal API, not intended to be called from user programs.
- */
- public static void setTaskContext(TaskContext tc) {
+ static void setTaskContext(TaskContext tc) {
taskContext.set(tc);
}
- public static TaskContext get() {
- return taskContext.get();
- }
-
- /** :: Internal API :: */
- public static void unset() {
+ static void unset() {
taskContext.remove();
}
- // List of callback functions to execute when the task completes.
- private transient List<TaskCompletionListener> onCompleteCallbacks =
- new ArrayList<TaskCompletionListener>();
-
- // Whether the corresponding task has been killed.
- private volatile boolean interrupted = false;
-
- // Whether the task has completed.
- private volatile boolean completed = false;
-
/**
- * Checks whether the task has completed.
+ * Whether the task has completed.
*/
- public boolean isCompleted() {
- return completed;
- }
+ public abstract boolean isCompleted();
/**
- * Checks whether the task has been killed.
+ * Whether the task has been killed.
*/
- public boolean isInterrupted() {
- return interrupted;
- }
+ public abstract boolean isInterrupted();
+
+ /** @deprecated: use isRunningLocally() */
+ @Deprecated
+ public abstract boolean runningLocally();
+
+ public abstract boolean isRunningLocally();
/**
* Add a (Java friendly) listener to be executed on task completion.
@@ -150,10 +74,7 @@ public class TaskContext implements Serializable {
* <p/>
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
- public TaskContext addTaskCompletionListener(TaskCompletionListener listener) {
- onCompleteCallbacks.add(listener);
- return this;
- }
+ public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener);
/**
* Add a listener in the form of a Scala closure to be executed on task completion.
@@ -161,109 +82,27 @@ public class TaskContext implements Serializable {
* <p/>
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
- public TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f) {
- onCompleteCallbacks.add(new TaskCompletionListener() {
- @Override
- public void onTaskCompletion(TaskContext context) {
- f.apply(context);
- }
- });
- return this;
- }
+ public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f);
/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
*
- * Deprecated: use addTaskCompletionListener
- *
+ * @deprecated: use addTaskCompletionListener
+ *
* @param f Callback function.
*/
@Deprecated
- public void addOnCompleteCallback(final Function0<Unit> f) {
- onCompleteCallbacks.add(new TaskCompletionListener() {
- @Override
- public void onTaskCompletion(TaskContext context) {
- f.apply();
- }
- });
- }
-
- /**
- * ::Internal API::
- * Marks the task as completed and triggers the listeners.
- */
- public void markTaskCompleted() throws TaskCompletionListenerException {
- completed = true;
- List<String> errorMsgs = new ArrayList<String>(2);
- // Process complete callbacks in the reverse order of registration
- List<TaskCompletionListener> revlist =
- new ArrayList<TaskCompletionListener>(onCompleteCallbacks);
- Collections.reverse(revlist);
- for (TaskCompletionListener tcl: revlist) {
- try {
- tcl.onTaskCompletion(this);
- } catch (Throwable e) {
- errorMsgs.add(e.getMessage());
- }
- }
-
- if (!errorMsgs.isEmpty()) {
- throw new TaskCompletionListenerException(JavaConversions.asScalaBuffer(errorMsgs));
- }
- }
-
- /**
- * ::Internal API::
- * Marks the task for interruption, i.e. cancellation.
- */
- public void markInterrupted() {
- interrupted = true;
- }
-
- @Deprecated
- /** Deprecated: use getStageId() */
- public int stageId() {
- return stageId;
- }
-
- @Deprecated
- /** Deprecated: use getPartitionId() */
- public int partitionId() {
- return partitionId;
- }
-
- @Deprecated
- /** Deprecated: use getAttemptId() */
- public long attemptId() {
- return attemptId;
- }
-
- @Deprecated
- /** Deprecated: use isRunningLocally() */
- public boolean runningLocally() {
- return runningLocally;
- }
-
- public boolean isRunningLocally() {
- return runningLocally;
- }
+ public abstract void addOnCompleteCallback(final Function0<Unit> f);
- public int getStageId() {
- return stageId;
- }
+ public abstract int stageId();
- public int getPartitionId() {
- return partitionId;
- }
+ public abstract int partitionId();
- public long getAttemptId() {
- return attemptId;
- }
+ public abstract long attemptId();
- /** ::Internal API:: */
- public TaskMetrics taskMetrics() {
- return taskMetrics;
- }
+ /** ::DeveloperApi:: */
+ @DeveloperApi
+ public abstract TaskMetrics taskMetrics();
}