diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-09-03 13:01:17 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-09-03 13:01:17 -0700 |
commit | 68df2464d1146f0eb3115859b44b7f3c5112505e (patch) | |
tree | 5132414211fb642fd7fce546181eb6235a863b4d /core | |
parent | 59218bdd4996a13116009e3669b1b875be23a694 (diff) | |
parent | b25918d8412eb314587e03b727dd1d6c99947669 (diff) | |
download | spark-68df2464d1146f0eb3115859b44b7f3c5112505e.tar.gz spark-68df2464d1146f0eb3115859b44b7f3c5112505e.tar.bz2 spark-68df2464d1146f0eb3115859b44b7f3c5112505e.zip |
Merge pull request #889 from alig/master
Return the port the WebUI is bound to (useful if port 0 was used)
Diffstat (limited to 'core')
3 files changed, 22 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c31619db27..1cfff5e565 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -127,4 +127,8 @@ private[deploy] object DeployMessages { case object CheckForWorkerTimeOut + case object RequestWebUIPort + + case class WebUIPortResponse(webUIBoundPort: Int) + } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 78e3747ad8..10161c8204 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0) + val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7cf0a7754f..bde59905bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,15 +17,18 @@ package org.apache.spark.deploy.master -import java.text.SimpleDateFormat import java.util.Date +import java.text.SimpleDateFormat import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ import akka.actor.Terminated +import akka.dispatch.Await +import akka.pattern.ask import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} import akka.util.duration._ +import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -33,6 +36,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} +import akka.util.{Duration, Timeout} private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case CheckForWorkerTimeOut => { timeOutDeadWorkers() } + + case RequestWebUIPort => { + sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1)) + } } /** @@ -364,7 +372,7 @@ private[spark] object Master { def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) + val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort) actorSystem.awaitTermination() } @@ -378,9 +386,14 @@ private[spark] object Master { } } - def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) - (actorSystem, boundPort) + val timeoutDuration = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + implicit val timeout = Timeout(timeoutDuration) + val respFuture = actor ? RequestWebUIPort // ask pattern + val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] + (actorSystem, boundPort, resp.webUIBoundPort) } } |