aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/master/Master.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/deploy/master/Master.scala')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala19
1 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index d1428bcfc6..770cfe9d05 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -15,7 +15,7 @@ import spark.{Logging, SparkException, Utils}
import spark.util.AkkaUtils
-private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
+private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
@@ -35,9 +35,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
var firstApp: Option[ApplicationInfo] = None
+ Utils.checkHost(host, "Expected hostname")
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
- if (envVar != null) envVar else ip
+ if (envVar != null) envVar else host
}
// As a temporary workaround before better ways of configuring memory, we allow users to set
@@ -46,7 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
- logInfo("Starting Spark master at spark://" + ip + ":" + port)
+ logInfo("Starting Spark master at spark://" + host + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
@@ -146,7 +148,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
case RequestMasterState => {
- sender ! MasterState(ip, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterState(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
}
@@ -212,13 +214,13 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
- exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
+ exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
// There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
- workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
+ workers.filter(w => (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
@@ -243,7 +245,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver)
+ val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
@@ -274,6 +276,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
+ exec.state = ExecutorState.KILLED
}
app.markFinished(state)
app.driver ! ApplicationRemoved(state.toString)
@@ -308,7 +311,7 @@ private[spark] object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
- val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort)
+ val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
actorSystem.awaitTermination()
}