aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaraj K <devaraj@apache.org>2017-02-10 14:11:56 +0000
committerSean Owen <sowen@cloudera.com>2017-02-10 14:11:56 +0000
commit8640dc08232c3d427a84ca621b07833e32fb8a3a (patch)
tree0153776172e170d68ffc01d7fbd2df530661a71e
parent8e8afb3a3468aa743d13e23e10e77e94b772b2ed (diff)
downloadspark-8640dc08232c3d427a84ca621b07833e32fb8a3a.tar.gz
spark-8640dc08232c3d427a84ca621b07833e32fb8a3a.tar.bz2
spark-8640dc08232c3d427a84ca621b07833e32fb8a3a.zip
[SPARK-10748][MESOS] Log error instead of crashing Spark Mesos dispatcher when a job is misconfigured
## What changes were proposed in this pull request? Now handling the spark exception which gets thrown for invalid job configuration, marking that job as failed and continuing to launch the other drivers instead of throwing the exception. ## How was this patch tested? I verified manually, now the misconfigured jobs move to Finished Drivers section in UI and continue to launch the other jobs. Author: Devaraj K <devaraj@apache.org> Closes #13077 from devaraj-kavali/SPARK-10748.
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala28
1 files changed, 19 insertions, 9 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index c5bbcb9bef..2760f31b12 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -559,15 +559,25 @@ private[spark] class MesosClusterScheduler(
} else {
val offer = offerOption.get
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
- val task = createTaskInfo(submission, offer)
- queuedTasks += task
- logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
- submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
- None, new Date(), None, getDriverFrameworkID(submission))
- launchedDrivers(submission.submissionId) = newState
- launchedDriversState.persist(submission.submissionId, newState)
- afterLaunchCallback(submission.submissionId)
+ try {
+ val task = createTaskInfo(submission, offer)
+ queuedTasks += task
+ logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
+ submission.submissionId)
+ val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
+ None, new Date(), None, getDriverFrameworkID(submission))
+ launchedDrivers(submission.submissionId) = newState
+ launchedDriversState.persist(submission.submissionId, newState)
+ afterLaunchCallback(submission.submissionId)
+ } catch {
+ case e: SparkException =>
+ afterLaunchCallback(submission.submissionId)
+ finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder().
+ setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue("").
+ build(), None, null, None, getDriverFrameworkID(submission))
+ logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
+ s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}")
+ }
}
}
}