aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala28
1 files changed, 19 insertions, 9 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index c5bbcb9bef..2760f31b12 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -559,15 +559,25 @@ private[spark] class MesosClusterScheduler(
} else {
val offer = offerOption.get
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
- val task = createTaskInfo(submission, offer)
- queuedTasks += task
- logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
- submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
- None, new Date(), None, getDriverFrameworkID(submission))
- launchedDrivers(submission.submissionId) = newState
- launchedDriversState.persist(submission.submissionId, newState)
- afterLaunchCallback(submission.submissionId)
+ try {
+ val task = createTaskInfo(submission, offer)
+ queuedTasks += task
+ logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
+ submission.submissionId)
+ val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
+ None, new Date(), None, getDriverFrameworkID(submission))
+ launchedDrivers(submission.submissionId) = newState
+ launchedDriversState.persist(submission.submissionId, newState)
+ afterLaunchCallback(submission.submissionId)
+ } catch {
+ case e: SparkException =>
+ afterLaunchCallback(submission.submissionId)
+ finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder().
+ setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue("").
+ build(), None, null, None, getDriverFrameworkID(submission))
+ logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
+ s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}")
+ }
}
}
}