aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-03-18 09:15:33 -0400
committerSean Owen <sowen@cloudera.com>2015-03-18 09:15:33 -0400
commit9d112a958ee2facad179344dd367a6d1ccbc9614 (patch)
tree6d79c29694e821878b3d790c5e5ced9fc0b24bdc /core
parente09c852d6b83b9b112685d113f2792daec8785a3 (diff)
downloadspark-9d112a958ee2facad179344dd367a6d1ccbc9614.tar.gz
spark-9d112a958ee2facad179344dd367a6d1ccbc9614.tar.bz2
spark-9d112a958ee2facad179344dd367a6d1ccbc9614.zip
[SPARK-6286][minor] Handle missing Mesos case TASK_ERROR.
Author: Iulian Dragos <jaguarul@gmail.com> Closes #5000 from dragos/issue/task-error-case and squashes the following commits: e063627 [Iulian Dragos] Handle TASK_ERROR in Mesos scheduler backends. ac17cf0 [Iulian Dragos] Handle missing Mesos case TASK_ERROR.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskState.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala10
3 files changed, 4 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index 0bf1e4a5e2..d85a6d6834 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -46,5 +46,6 @@ private[spark] object TaskState extends Enumeration {
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
+ case MesosTaskState.TASK_ERROR => LOST
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 90dfe14352..fc92b9c35c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -262,20 +262,12 @@ private[spark] class CoarseMesosSchedulerBackend(
.build()
}
- /** Check whether a Mesos task state represents a finished task */
- private def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo("Mesos task " + taskId + " is now " + state)
synchronized {
- if (isFinished(state)) {
+ if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
taskIdToSlaveId -= taskId
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index cfb6592e14..df8f4306b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -313,14 +313,6 @@ private[spark] class MesosSchedulerBackend(
.build()
}
- /** Check whether a Mesos task state represents a finished task */
- def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
inClassLoader() {
val tid = status.getTaskId.getValue.toLong
@@ -330,7 +322,7 @@ private[spark] class MesosSchedulerBackend(
// We lost the executor on this slave, so remember that it's gone
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}
- if (isFinished(status.getState)) {
+ if (TaskState.isFinished(state)) {
taskIdToSlaveId.remove(tid)
}
}