aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/master/Master.scala
blob: 92e7914b1b931a01e4237d497821306ea2989463 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package spark.deploy.master

import akka.actor._
import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}

import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import spark.deploy._
import spark.{Logging, SparkException, Utils}
import spark.util.AkkaUtils


private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
  val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For job IDs

  var nextJobNumber = 0
  val workers = new HashSet[WorkerInfo]
  val idToWorker = new HashMap[String, WorkerInfo]
  val actorToWorker = new HashMap[ActorRef, WorkerInfo]
  val addressToWorker = new HashMap[Address, WorkerInfo]

  val jobs = new HashSet[JobInfo]
  val idToJob = new HashMap[String, JobInfo]
  val actorToJob = new HashMap[ActorRef, JobInfo]
  val addressToJob = new HashMap[Address, JobInfo]

  val waitingJobs = new ArrayBuffer[JobInfo]
  val completedJobs = new ArrayBuffer[JobInfo]

  val masterPublicAddress = {
    val envVar = System.getenv("SPARK_PUBLIC_DNS")
    if (envVar != null) envVar else ip
  }

  // As a temporary workaround before better ways of configuring memory, we allow users to set
  // a flag that will perform round-robin scheduling across the nodes (spreading out each job
  // among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
  val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean

  override def preStart() {
    logInfo("Starting Spark master at spark://" + ip + ":" + port)
    // Listen for remote client disconnection events, since they don't go through Akka's watch()
    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
    startWebUi()
  }

  def startWebUi() {
    val webUi = new MasterWebUI(context.system, self)
    try {
      AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
    } catch {
      case e: Exception =>
        logError("Failed to create web UI", e)
        System.exit(1)
    }
  }

  override def receive = {
    case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
      if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
        context.watch(sender)  // This doesn't work with remote actors but helps for testing
        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
        schedule()
      }
    }

    case RegisterJob(description) => {
      logInfo("Registering job " + description.name)
      val job = addJob(description, sender)
      logInfo("Registered job " + description.name + " with ID " + job.id)
      waitingJobs += job
      context.watch(sender)  // This doesn't work with remote actors but helps for testing
      sender ! RegisteredJob(job.id)
      schedule()
    }

    case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => {
      val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
      execOption match {
        case Some(exec) => {
          exec.state = state
          exec.job.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          if (ExecutorState.isFinished(state)) {
            val jobInfo = idToJob(jobId)
            // Remove this executor from the worker and job
            logInfo("Removing executor " + exec.fullId + " because it is " + state)
            jobInfo.removeExecutor(exec)
            exec.worker.removeExecutor(exec)

            // Only retry certain number of times so we don't go into an infinite loop.
            if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
              schedule()
            } else {
              logError("Job %s with ID %s failed %d times, removing it".format(
                jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
              removeJob(jobInfo)
            }
          }
        }
        case None =>
          logWarning("Got status update for unknown executor " + jobId + "/" + execId)
      }
    }

    case Terminated(actor) => {
      // The disconnected actor could've been either a worker or a job; remove whichever of
      // those we have an entry for in the corresponding actor hashmap
      actorToWorker.get(actor).foreach(removeWorker)
      actorToJob.get(actor).foreach(removeJob)
    }

    case RemoteClientDisconnected(transport, address) => {
      // The disconnected client could've been either a worker or a job; remove whichever it was
      addressToWorker.get(address).foreach(removeWorker)
      addressToJob.get(address).foreach(removeJob)
    }

    case RemoteClientShutdown(transport, address) => {
      // The disconnected client could've been either a worker or a job; remove whichever it was
      addressToWorker.get(address).foreach(removeWorker)
      addressToJob.get(address).foreach(removeJob)
    }

    case RequestMasterState => {
      sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
    }
  }

  /**
   * Can a job use the given worker? True if the worker has enough memory and we haven't already
   * launched an executor for the job on it (right now the standalone backend doesn't like having
   * two executors on the same worker).
   */
  def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
    worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
  }

  /**
   * Schedule the currently available resources among waiting jobs. This method will be called
   * every time a new job joins or resource availability changes.
   */
  def schedule() {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
    // in the queue, then the second job, etc.
    if (spreadOutJobs) {
      // Try to spread out each job among all the nodes, until it has all its cores
      for (job <- waitingJobs if job.coresLeft > 0) {
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(job, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        while (toAssign > 0) {
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            toAssign -= 1
            assigned(pos) += 1
          }
          pos = (pos + 1) % numUsable
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) {
            val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
            launchExecutor(usableWorkers(pos), exec, job.desc.sparkHome)
            job.state = JobState.RUNNING
          }
        }
      }
    } else {
      // Pack each job into as few nodes as possible until we've assigned all its cores
      for (worker <- workers if worker.coresFree > 0) {
        for (job <- waitingJobs if job.coresLeft > 0) {
          if (canUse(job, worker)) {
            val coresToUse = math.min(worker.coresFree, job.coresLeft)
            if (coresToUse > 0) {
              val exec = job.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec, job.desc.sparkHome)
              job.state = JobState.RUNNING
            }
          }
        }
      }
    }
  }

  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory, sparkHome)
    exec.job.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
  }

  def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
    publicAddress: String): WorkerInfo = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
    workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
    val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
    workers += worker
    idToWorker(worker.id) = worker
    actorToWorker(sender) = worker
    addressToWorker(sender.path.address) = worker
    return worker
  }

  def removeWorker(worker: WorkerInfo) {
    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
    worker.setState(WorkerState.DEAD)
    idToWorker -= worker.id
    actorToWorker -= worker.actor
    addressToWorker -= worker.actor.path.address
    for (exec <- worker.executors.values) {
      exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
      exec.job.executors -= exec.id
    }
  }

  def addJob(desc: JobDescription, driver: ActorRef): JobInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    val job = new JobInfo(now, newJobId(date), desc, date, driver)
    jobs += job
    idToJob(job.id) = job
    actorToJob(driver) = job
    addressToJob(driver.path.address) = job
    return job
  }

  def removeJob(job: JobInfo) {
    if (jobs.contains(job)) {
      logInfo("Removing job " + job.id)
      jobs -= job
      idToJob -= job.id
      actorToJob -= job.driver
      addressToWorker -= job.driver.path.address
      completedJobs += job   // Remember it in our history
      waitingJobs -= job
      for (exec <- job.executors.values) {
        exec.worker.removeExecutor(exec)
        exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
      }
      job.markFinished(JobState.FINISHED)  // TODO: Mark it as FAILED if it failed
      schedule()
    }
  }

  /** Generate a new job ID given a job's submission date */
  def newJobId(submitDate: Date): String = {
    val jobId = "job-%s-%04d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
    nextJobNumber += 1
    jobId
  }
}

private[spark] object Master {
  private val systemName = "sparkMaster"
  private val actorName = "Master"
  private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r

  def main(argStrings: Array[String]) {
    val args = new MasterArguments(argStrings)
    val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort)
    actorSystem.awaitTermination()
  }

  /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
  def toAkkaUrl(sparkUrl: String): String = {
    sparkUrl match {
      case sparkUrlRegex(host, port) =>
        "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
      case _ =>
        throw new SparkException("Invalid master URL: " + sparkUrl)
    }
  }

  def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
    val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
    (actorSystem, boundPort)
  }
}