aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-27 14:46:00 -0700
committerReynold Xin <rxin@apache.org>2014-09-27 14:46:00 -0700
commit5b922bb458e863f5be0ae68167de882743f70b86 (patch)
treee45a33a33ba15c7a8fc4c16b09ae6440786f19f5 /core
parent0d8cdf0ede908f6c488a075170f1563815009e29 (diff)
downloadspark-5b922bb458e863f5be0ae68167de882743f70b86.tar.gz
spark-5b922bb458e863f5be0ae68167de882743f70b86.tar.bz2
spark-5b922bb458e863f5be0ae68167de882743f70b86.zip
[SPARK-3543] Clean up Java TaskContext implementation.
This addresses some minor issues in https://github.com/apache/spark/pull/2425 Author: Reynold Xin <rxin@apache.org> Closes #2557 from rxin/TaskContext and squashes the following commits: a51e5f6 [Reynold Xin] [SPARK-3543] Clean up Java TaskContext implementation.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/TaskContext.java33
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala8
5 files changed, 22 insertions, 29 deletions
diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java
index 09b8ce02bd..4e6d708af0 100644
--- a/core/src/main/java/org/apache/spark/TaskContext.java
+++ b/core/src/main/java/org/apache/spark/TaskContext.java
@@ -56,7 +56,7 @@ public class TaskContext implements Serializable {
* @param taskMetrics performance metrics of the task
*/
@DeveloperApi
- public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally,
+ public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally,
TaskMetrics taskMetrics) {
this.attemptId = attemptId;
this.partitionId = partitionId;
@@ -65,7 +65,6 @@ public class TaskContext implements Serializable {
this.taskMetrics = taskMetrics;
}
-
/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
@@ -76,8 +75,7 @@ public class TaskContext implements Serializable {
* @param runningLocally whether the task is running locally in the driver JVM
*/
@DeveloperApi
- public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
- Boolean runningLocally) {
+ public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) {
this.attemptId = attemptId;
this.partitionId = partitionId;
this.runningLocally = runningLocally;
@@ -85,7 +83,6 @@ public class TaskContext implements Serializable {
this.taskMetrics = TaskMetrics.empty();
}
-
/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
@@ -95,7 +92,7 @@ public class TaskContext implements Serializable {
* @param attemptId the number of attempts to execute this task
*/
@DeveloperApi
- public TaskContext(Integer stageId, Integer partitionId, Long attemptId) {
+ public TaskContext(int stageId, int partitionId, long attemptId) {
this.attemptId = attemptId;
this.partitionId = partitionId;
this.runningLocally = false;
@@ -107,9 +104,9 @@ public class TaskContext implements Serializable {
new ThreadLocal<TaskContext>();
/**
- * :: Internal API ::
- * This is spark internal API, not intended to be called from user programs.
- */
+ * :: Internal API ::
+ * This is spark internal API, not intended to be called from user programs.
+ */
public static void setTaskContext(TaskContext tc) {
taskContext.set(tc);
}
@@ -118,10 +115,8 @@ public class TaskContext implements Serializable {
return taskContext.get();
}
- /**
- * :: Internal API ::
- */
- public static void remove() {
+ /** :: Internal API :: */
+ public static void unset() {
taskContext.remove();
}
@@ -130,22 +125,22 @@ public class TaskContext implements Serializable {
new ArrayList<TaskCompletionListener>();
// Whether the corresponding task has been killed.
- private volatile Boolean interrupted = false;
+ private volatile boolean interrupted = false;
// Whether the task has completed.
- private volatile Boolean completed = false;
+ private volatile boolean completed = false;
/**
* Checks whether the task has completed.
*/
- public Boolean isCompleted() {
+ public boolean isCompleted() {
return completed;
}
/**
* Checks whether the task has been killed.
*/
- public Boolean isInterrupted() {
+ public boolean isInterrupted() {
return interrupted;
}
@@ -246,12 +241,12 @@ public class TaskContext implements Serializable {
}
@Deprecated
- /** Deprecated: use getRunningLocally() */
+ /** Deprecated: use isRunningLocally() */
public boolean runningLocally() {
return runningLocally;
}
- public boolean getRunningLocally() {
+ public boolean isRunningLocally() {
return runningLocally;
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 32cf29ed14..70c235dfff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -641,7 +641,7 @@ class DAGScheduler(
job.listener.taskSucceeded(0, result)
} finally {
taskContext.markTaskCompleted()
- TaskContext.remove()
+ TaskContext.unset()
}
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 2ccbd8edeb..4a9ff918af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -58,11 +58,7 @@ private[spark] class ResultTask[T, U](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
- try {
- func(context, rdd.iterator(partition, context))
- } finally {
- context.markTaskCompleted()
- }
+ func(context, rdd.iterator(partition, context))
}
// This is only callable on the driver side.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index a98ee11825..79709089c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -78,8 +78,6 @@ private[spark] class ShuffleMapTask(
log.debug("Could not stop writer", e)
}
throw e
- } finally {
- context.markTaskCompleted()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index bf73f6f7bd..c6e47c84a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -52,7 +52,12 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
if (_killed) {
kill(interruptThread = false)
}
- runTask(context)
+ try {
+ runTask(context)
+ } finally {
+ context.markTaskCompleted()
+ TaskContext.unset()
+ }
}
def runTask(context: TaskContext): T
@@ -93,7 +98,6 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
if (interruptThread && taskThread != null) {
taskThread.interrupt()
}
- TaskContext.remove()
}
}