aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala15
1 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index fdb633bd33..f254f5585b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -104,6 +104,8 @@ private[spark] class Master(
var leaderElectionAgent: ActorRef = _
+ private var recoveryCompletionTask: Cancellable = _
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@@ -152,6 +154,10 @@ private[spark] class Master(
}
override def postStop() {
+ // prevent the CompleteRecovery message sending to restarted master
+ if (recoveryCompletionTask != null) {
+ recoveryCompletionTask.cancel()
+ }
webUi.stop()
fileSystemsUsed.foreach(_.close())
masterMetricsSystem.stop()
@@ -171,10 +177,13 @@ private[spark] class Master(
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
- context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
+ recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
+ CompleteRecovery)
}
}
+ case CompleteRecovery => completeRecovery()
+
case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
@@ -465,7 +474,7 @@ private[spark] class Master(
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
- def schedule() {
+ private def schedule() {
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
@@ -485,7 +494,7 @@ private[spark] class Master(
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ .filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)