diff options
author | Devaraj K <devaraj@apache.org> | 2017-02-10 14:11:56 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-10 14:11:56 +0000 |
commit | 8640dc08232c3d427a84ca621b07833e32fb8a3a (patch) | |
tree | 0153776172e170d68ffc01d7fbd2df530661a71e | |
parent | 8e8afb3a3468aa743d13e23e10e77e94b772b2ed (diff) | |
download | spark-8640dc08232c3d427a84ca621b07833e32fb8a3a.tar.gz spark-8640dc08232c3d427a84ca621b07833e32fb8a3a.tar.bz2 spark-8640dc08232c3d427a84ca621b07833e32fb8a3a.zip |
[SPARK-10748][MESOS] Log error instead of crashing Spark Mesos dispatcher when a job is misconfigured
## What changes were proposed in this pull request?
Now handling the spark exception which gets thrown for invalid job configuration, marking that job as failed and continuing to launch the other drivers instead of throwing the exception.
## How was this patch tested?
I verified manually, now the misconfigured jobs move to Finished Drivers section in UI and continue to launch the other jobs.
Author: Devaraj K <devaraj@apache.org>
Closes #13077 from devaraj-kavali/SPARK-10748.
-rw-r--r-- | resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index c5bbcb9bef..2760f31b12 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -559,15 +559,25 @@ private[spark] class MesosClusterScheduler( } else { val offer = offerOption.get val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) - val task = createTaskInfo(submission, offer) - queuedTasks += task - logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + - submission.submissionId) - val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, - None, new Date(), None, getDriverFrameworkID(submission)) - launchedDrivers(submission.submissionId) = newState - launchedDriversState.persist(submission.submissionId, newState) - afterLaunchCallback(submission.submissionId) + try { + val task = createTaskInfo(submission, offer) + queuedTasks += task + logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + + submission.submissionId) + val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, + None, new Date(), None, getDriverFrameworkID(submission)) + launchedDrivers(submission.submissionId) = newState + launchedDriversState.persist(submission.submissionId, newState) + afterLaunchCallback(submission.submissionId) + } catch { + case e: SparkException => + afterLaunchCallback(submission.submissionId) + finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder(). + setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue(""). + build(), None, null, None, getDriverFrameworkID(submission)) + logError(s"Failed to launch the driver with id: ${submission.submissionId}, " + + s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}") + } } } } |