diff options
-rw-r--r-- | resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index c5bbcb9bef..2760f31b12 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -559,15 +559,25 @@ private[spark] class MesosClusterScheduler( } else { val offer = offerOption.get val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) - val task = createTaskInfo(submission, offer) - queuedTasks += task - logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + - submission.submissionId) - val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, - None, new Date(), None, getDriverFrameworkID(submission)) - launchedDrivers(submission.submissionId) = newState - launchedDriversState.persist(submission.submissionId, newState) - afterLaunchCallback(submission.submissionId) + try { + val task = createTaskInfo(submission, offer) + queuedTasks += task + logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + + submission.submissionId) + val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, + None, new Date(), None, getDriverFrameworkID(submission)) + launchedDrivers(submission.submissionId) = newState + launchedDriversState.persist(submission.submissionId, newState) + afterLaunchCallback(submission.submissionId) + } catch { + case e: SparkException => + afterLaunchCallback(submission.submissionId) + finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder(). + setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue(""). + build(), None, null, None, getDriverFrameworkID(submission)) + logError(s"Failed to launch the driver with id: ${submission.submissionId}, " + + s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}") + } } } } |