From 6b5980da796e0204a7735a31fb454f312bc9daac Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 19 Sep 2012 15:36:16 -0700 Subject: Set a limited number of retry in standalone deploy mode. --- .../main/scala/spark/deploy/master/JobInfo.scala | 9 ++++++ .../main/scala/spark/deploy/master/JobState.scala | 2 ++ .../main/scala/spark/deploy/master/Master.scala | 37 ++++++++++++++-------- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 31d48b82b9..4c81a1b447 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va } def coresLeft: Int = desc.cores - coresGranted + + private var _retryCount = 0 + + def retryCount = _retryCount + + def incrementRetryCount = { + _retryCount += 1 + _retryCount + } } diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala index 50b0c6f95b..8d458ac39c 100644 --- a/core/src/main/scala/spark/deploy/master/JobState.scala +++ b/core/src/main/scala/spark/deploy/master/JobState.scala @@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") type JobState = Value val WAITING, RUNNING, FINISHED, FAILED = Value + + val MAX_NUM_RETRY = 10 } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c98dddea7b..5cc73633ab 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -1,19 +1,18 @@ package spark.deploy.master -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} - import akka.actor._ -import spark.{Logging, Utils} -import spark.util.AkkaUtils +import akka.actor.Terminated +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} + import java.text.SimpleDateFormat import java.util.Date -import akka.remote.RemoteClientLifeCycleEvent + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + import spark.deploy._ -import akka.remote.RemoteClientShutdown -import akka.remote.RemoteClientDisconnected -import spark.deploy.RegisterWorker -import spark.deploy.RegisterWorkerFailed -import akka.actor.Terminated +import spark.{Logging, SparkException, Utils} +import spark.util.AkkaUtils + class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs @@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { exec.state = state exec.job.actor ! ExecutorUpdated(execId, state, message) if (ExecutorState.isFinished(state)) { + val jobInfo = idToJob(jobId) // Remove this executor from the worker and job logInfo("Removing executor " + exec.fullId + " because it is " + state) - idToJob(jobId).removeExecutor(exec) + jobInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) - // TODO: the worker would probably want to restart the executor a few times - schedule() + + // Only retry certain number of times so we don't go into an infinite loop. + if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) { + schedule() + } else { + val e = new SparkException("Job %s wth ID %s failed %d times.".format( + jobInfo.desc.name, jobInfo.id, jobInfo.retryCount)) + logError(e.getMessage, e) + throw e + //System.exit(1) + } } } case None => @@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { addressToWorker.get(address).foreach(removeWorker) addressToJob.get(address).foreach(removeJob) } - + case RequestMasterState => { sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList) } -- cgit v1.2.3