aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/client/Client.scala
blob: e51b0c5c15f535016b531aa62699144d0d9f0c81 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package spark.deploy.client

import spark.deploy._
import akka.actor._
import akka.pattern.ask
import akka.util.duration._
import akka.pattern.AskTimeoutException
import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
import akka.dispatch.Await

/**
 * The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
 * and a listener for job events, and calls back the listener when various events occur.
 */
private[spark] class Client(
    actorSystem: ActorSystem,
    masterUrl: String,
    jobDescription: JobDescription,
    listener: ClientListener)
  extends Logging {

  val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r

  var actor: ActorRef = null
  var jobId: String = null

  if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
    throw new SparkException("Invalid master URL: " + masterUrl)
  }

  class ClientActor extends Actor with Logging {
    var master: ActorRef = null
    var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times

    override def preStart() {
      val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
      logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
      val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
      try {
        master = context.actorFor(akkaUrl)
        master ! RegisterJob(jobDescription)
        context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
        context.watch(master)  // Doesn't work with remote actors, but useful for testing
      } catch {
        case e: Exception =>
          logError("Failed to connect to master", e)
          markDisconnected()
          context.stop(self)
      }
    }

    override def receive = {
      case RegisteredJob(jobId_) =>
        jobId = jobId_
        listener.connected(jobId)

      case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
        val fullId = jobId + "/" + id
        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
        listener.executorAdded(fullId, workerId, host, cores, memory)

      case ExecutorUpdated(id, state, message) =>
        val fullId = jobId + "/" + id
        val messageText = message.map(s => " (" + s + ")").getOrElse("")
        logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
        if (ExecutorState.isFinished(state)) {
          listener.executorRemoved(fullId, message.getOrElse(""))
        }

      case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
        logError("Connection to master failed; stopping client")
        markDisconnected()
        context.stop(self)

      case StopClient =>
        markDisconnected()
        sender ! true
        context.stop(self)
    }

    /**
     * Notify the listener that we disconnected, if we hadn't already done so before.
     */
    def markDisconnected() {
      if (!alreadyDisconnected) {
        listener.disconnected()
        alreadyDisconnected = true
      }
    }
  }

  def start() {
    // Just launch an actor; it will call back into the listener.
    actor = actorSystem.actorOf(Props(new ClientActor))
  }

  def stop() {
    if (actor != null) {
      try {
        val timeout = 1.seconds
        val future = actor.ask(StopClient)(timeout)
        Await.result(future, timeout)
      } catch {
        case e: AskTimeoutException =>  // Ignore it, maybe master went away
      }
      actor = null
    }
  }
}