aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-09-19 15:36:16 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2012-09-19 15:41:56 -0700
commit6b5980da796e0204a7735a31fb454f312bc9daac (patch)
treef01060defd62afb7822d2f11057415c31c8d4eee
parent9a449e00633d4028652e8ccc46486bb6ce886635 (diff)
downloadspark-6b5980da796e0204a7735a31fb454f312bc9daac.tar.gz
spark-6b5980da796e0204a7735a31fb454f312bc9daac.tar.bz2
spark-6b5980da796e0204a7735a31fb454f312bc9daac.zip
Set a limited number of retry in standalone deploy mode.
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/JobState.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala37
3 files changed, 34 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 31d48b82b9..4c81a1b447 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
}
def coresLeft: Int = desc.cores - coresGranted
+
+ private var _retryCount = 0
+
+ def retryCount = _retryCount
+
+ def incrementRetryCount = {
+ _retryCount += 1
+ _retryCount
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 50b0c6f95b..8d458ac39c 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED")
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
+
+ val MAX_NUM_RETRY = 10
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c98dddea7b..5cc73633ab 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,19 +1,18 @@
package spark.deploy.master
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
import akka.actor._
-import spark.{Logging, Utils}
-import spark.util.AkkaUtils
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+
import java.text.SimpleDateFormat
import java.util.Date
-import akka.remote.RemoteClientLifeCycleEvent
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import spark.deploy._
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
-import spark.deploy.RegisterWorker
-import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
+import spark.{Logging, SparkException, Utils}
+import spark.util.AkkaUtils
+
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.state = state
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
+ val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
logInfo("Removing executor " + exec.fullId + " because it is " + state)
- idToJob(jobId).removeExecutor(exec)
+ jobInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
- // TODO: the worker would probably want to restart the executor a few times
- schedule()
+
+ // Only retry certain number of times so we don't go into an infinite loop.
+ if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+ schedule()
+ } else {
+ val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+ jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
+ logError(e.getMessage, e)
+ throw e
+ //System.exit(1)
+ }
}
}
case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
-
+
case RequestMasterState => {
sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
}