aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-03 00:46:04 -0800
committerReynold Xin <rxin@databricks.com>2015-02-03 00:46:04 -0800
commitbebf4c42bef3e75d31ffce9bfdb331c16f34ddb1 (patch)
tree3c87c85407ff969743c6a2a5af8f1f741df046e3 /core
parent523a93523d0f9fc12de1ba2dc1acc360cdbc7027 (diff)
downloadspark-bebf4c42bef3e75d31ffce9bfdb331c16f34ddb1.tar.gz
spark-bebf4c42bef3e75d31ffce9bfdb331c16f34ddb1.tar.bz2
spark-bebf4c42bef3e75d31ffce9bfdb331c16f34ddb1.zip
[SPARK-5549] Define TaskContext interface in Scala.
So the interface documentation shows up in ScalaDoc. Author: Reynold Xin <rxin@databricks.com> Closes #4324 from rxin/TaskContext-scala and squashes the following commits: 2480a17 [Reynold Xin] comment 573756f [Reynold Xin] style fixes and javadoc fixes. 87dd537 [Reynold Xin] [SPARK-5549] Define TaskContext interface in Scala.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/TaskContext.java126
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala136
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala8
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java (renamed from core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java)3
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java41
5 files changed, 183 insertions, 131 deletions
diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java
deleted file mode 100644
index 095f9fb94f..0000000000
--- a/core/src/main/java/org/apache/spark/TaskContext.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.Serializable;
-
-import scala.Function0;
-import scala.Function1;
-import scala.Unit;
-
-import org.apache.spark.annotation.DeveloperApi;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.util.TaskCompletionListener;
-
-/**
- * Contextual information about a task which can be read or mutated during
- * execution. To access the TaskContext for a running task use
- * TaskContext.get().
- */
-public abstract class TaskContext implements Serializable {
- /**
- * Return the currently active TaskContext. This can be called inside of
- * user functions to access contextual information about running tasks.
- */
- public static TaskContext get() {
- return taskContext.get();
- }
-
- private static ThreadLocal<TaskContext> taskContext =
- new ThreadLocal<TaskContext>();
-
- static void setTaskContext(TaskContext tc) {
- taskContext.set(tc);
- }
-
- static void unset() {
- taskContext.remove();
- }
-
- /**
- * Whether the task has completed.
- */
- public abstract boolean isCompleted();
-
- /**
- * Whether the task has been killed.
- */
- public abstract boolean isInterrupted();
-
- /** @deprecated use {@link #isRunningLocally()} */
- @Deprecated
- public abstract boolean runningLocally();
-
- public abstract boolean isRunningLocally();
-
- /**
- * Add a (Java friendly) listener to be executed on task completion.
- * This will be called in all situation - success, failure, or cancellation.
- * An example use is for HadoopRDD to register a callback to close the input stream.
- */
- public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener);
-
- /**
- * Add a listener in the form of a Scala closure to be executed on task completion.
- * This will be called in all situations - success, failure, or cancellation.
- * An example use is for HadoopRDD to register a callback to close the input stream.
- */
- public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f);
-
- /**
- * Add a callback function to be executed on task completion. An example use
- * is for HadoopRDD to register a callback to close the input stream.
- * Will be called in any situation - success, failure, or cancellation.
- *
- * @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
- *
- * @param f Callback function.
- */
- @Deprecated
- public abstract void addOnCompleteCallback(final Function0<Unit> f);
-
- /**
- * The ID of the stage that this task belong to.
- */
- public abstract int stageId();
-
- /**
- * The ID of the RDD partition that is computed by this task.
- */
- public abstract int partitionId();
-
- /**
- * How many times this task has been attempted. The first task attempt will be assigned
- * attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
- */
- public abstract int attemptNumber();
-
- /** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
- @Deprecated
- public abstract long attemptId();
-
- /**
- * An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
- * will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
- */
- public abstract long taskAttemptId();
-
- /** ::DeveloperApi:: */
- @DeveloperApi
- public abstract TaskMetrics taskMetrics();
-}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
new file mode 100644
index 0000000000..af9c138f97
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.util.TaskCompletionListener
+
+
+object TaskContext {
+ /**
+ * Return the currently active TaskContext. This can be called inside of
+ * user functions to access contextual information about running tasks.
+ */
+ def get(): TaskContext = taskContext.get
+
+ private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
+
+ // Note: protected[spark] instead of private[spark] to prevent the following two from
+ // showing up in JavaDoc.
+ /**
+ * Set the thread local TaskContext. Internal to Spark.
+ */
+ protected[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc)
+
+ /**
+ * Unset the thread local TaskContext. Internal to Spark.
+ */
+ protected[spark] def unset(): Unit = taskContext.remove()
+}
+
+
+/**
+ * Contextual information about a task which can be read or mutated during
+ * execution. To access the TaskContext for a running task, use:
+ * {{{
+ * org.apache.spark.TaskContext.get()
+ * }}}
+ */
+abstract class TaskContext extends Serializable {
+ // Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler
+ // from generating a static get method (based on the companion object's get method).
+
+ // Note: Update JavaTaskContextCompileCheck when new methods are added to this class.
+
+ // Note: getters in this class are defined with parentheses to maintain backward compatibility.
+
+ /**
+ * Returns true if the task has completed.
+ */
+ def isCompleted(): Boolean
+
+ /**
+ * Returns true if the task has been killed.
+ */
+ def isInterrupted(): Boolean
+
+ @deprecated("1.2.0", "use isRunningLocally")
+ def runningLocally(): Boolean
+
+ /**
+ * Returns true if the task is running locally in the driver program.
+ * @return
+ */
+ def isRunningLocally(): Boolean
+
+ /**
+ * Adds a (Java friendly) listener to be executed on task completion.
+ * This will be called in all situation - success, failure, or cancellation.
+ * An example use is for HadoopRDD to register a callback to close the input stream.
+ */
+ def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
+
+ /**
+ * Adds a listener in the form of a Scala closure to be executed on task completion.
+ * This will be called in all situations - success, failure, or cancellation.
+ * An example use is for HadoopRDD to register a callback to close the input stream.
+ */
+ def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
+
+ /**
+ * Adds a callback function to be executed on task completion. An example use
+ * is for HadoopRDD to register a callback to close the input stream.
+ * Will be called in any situation - success, failure, or cancellation.
+ *
+ * @param f Callback function.
+ */
+ @deprecated("1.2.0", "use addTaskCompletionListener")
+ def addOnCompleteCallback(f: () => Unit)
+
+ /**
+ * The ID of the stage that this task belong to.
+ */
+ def stageId(): Int
+
+ /**
+ * The ID of the RDD partition that is computed by this task.
+ */
+ def partitionId(): Int
+
+ /**
+ * How many times this task has been attempted. The first task attempt will be assigned
+ * attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
+ */
+ def attemptNumber(): Int
+
+ @deprecated("1.3.0", "use attemptNumber")
+ def attemptId(): Long
+
+ /**
+ * An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
+ * will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
+ */
+ def taskAttemptId(): Long
+
+ /** ::DeveloperApi:: */
+ @DeveloperApi
+ def taskMetrics(): TaskMetrics
+}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 9bb0c61e44..337c8e4ebe 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -33,7 +33,7 @@ private[spark] class TaskContextImpl(
with Logging {
// For backwards-compatibility; this method is now deprecated as of 1.3.0.
- override def attemptId: Long = taskAttemptId
+ override def attemptId(): Long = taskAttemptId
// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -87,10 +87,10 @@ private[spark] class TaskContextImpl(
interrupted = true
}
- override def isCompleted: Boolean = completed
+ override def isCompleted(): Boolean = completed
- override def isRunningLocally: Boolean = runningLocally
+ override def isRunningLocally(): Boolean = runningLocally
- override def isInterrupted: Boolean = interrupted
+ override def isInterrupted(): Boolean = interrupted
}
diff --git a/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java b/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java
index e9ec700e32..e38bc38949 100644
--- a/core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.util;
+package test.org.apache.spark;
import org.apache.spark.TaskContext;
+import org.apache.spark.util.TaskCompletionListener;
/**
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
new file mode 100644
index 0000000000..4a918f725d
--- /dev/null
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark;
+
+import org.apache.spark.TaskContext;
+
+/**
+ * Something to make sure that TaskContext can be used in Java.
+ */
+public class JavaTaskContextCompileCheck {
+
+ public static void test() {
+ TaskContext tc = TaskContext.get();
+
+ tc.isCompleted();
+ tc.isInterrupted();
+ tc.isRunningLocally();
+
+ tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
+
+ tc.attemptNumber();
+ tc.partitionId();
+ tc.stageId();
+ tc.taskAttemptId();
+ }
+}