aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-08-01 15:14:39 -0700
committerMark Hamstra <markhamstra@gmail.com>2013-08-05 13:13:56 -0700
commitcdd1af562ef3fb480f2e98300e3d463657c09681 (patch)
tree82065b74c8390419422f3866d485f59768bfe7d7 /core
parente8bec8365f6598affd0335eae82b093acf4671da (diff)
downloadspark-cdd1af562ef3fb480f2e98300e3d463657c09681.tar.gz
spark-cdd1af562ef3fb480f2e98300e3d463657c09681.tar.bz2
spark-cdd1af562ef3fb480f2e98300e3d463657c09681.zip
Timeout zombie workers
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala18
1 files changed, 12 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 0aed4b9802..b50613f866 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
-
+ val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
+
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@@ -337,12 +338,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
- val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
- val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+ val currentTime = System.currentTimeMillis()
+ val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
for (worker <- toRemove) {
- logWarning("Removing %s because we got no heartbeat in %d seconds".format(
- worker.id, WORKER_TIMEOUT))
- removeWorker(worker)
+ if (worker.state != WorkerState.DEAD) {
+ logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+ worker.id, WORKER_TIMEOUT))
+ removeWorker(worker)
+ } else {
+ if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ }
}
}
}