diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-08-01 15:14:39 -0700 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-08-05 13:13:56 -0700 |
commit | cdd1af562ef3fb480f2e98300e3d463657c09681 (patch) | |
tree | 82065b74c8390419422f3866d485f59768bfe7d7 /core | |
parent | e8bec8365f6598affd0335eae82b093acf4671da (diff) | |
download | spark-cdd1af562ef3fb480f2e98300e3d463657c09681.tar.gz spark-cdd1af562ef3fb480f2e98300e3d463657c09681.tar.bz2 spark-cdd1af562ef3fb480f2e98300e3d463657c09681.zip |
Timeout zombie workers
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/deploy/master/Master.scala | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 0aed4b9802..b50613f866 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt - + val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt + var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo] @@ -337,12 +338,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act /** Check for, and remove, any timed-out workers */ def timeOutDeadWorkers() { // Copy the workers into an array so we don't modify the hashset while iterating through it - val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT - val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray + val currentTime = System.currentTimeMillis() + val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray for (worker <- toRemove) { - logWarning("Removing %s because we got no heartbeat in %d seconds".format( - worker.id, WORKER_TIMEOUT)) - removeWorker(worker) + if (worker.state != WorkerState.DEAD) { + logWarning("Removing %s because we got no heartbeat in %d seconds".format( + worker.id, WORKER_TIMEOUT)) + removeWorker(worker) + } else { + if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) + workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it + } } } } |