aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-09-02 18:21:21 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-09-02 18:21:21 -0700
commitcf7b115496d6847100fee82ce5a1435b52109538 (patch)
tree8203ecf86ae7e025ba8a02c5962b4dfc445d81fa /core
parent636fc0c89e837f1dec72bdacc749423b08e06126 (diff)
downloadspark-cf7b115496d6847100fee82ce5a1435b52109538.tar.gz
spark-cf7b115496d6847100fee82ce5a1435b52109538.tar.bz2
spark-cf7b115496d6847100fee82ce5a1435b52109538.zip
Enabling getting the actual WEBUI port
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala17
3 files changed, 19 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c31619db27..14a453699d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -127,4 +127,8 @@ private[deploy] object DeployMessages {
case object CheckForWorkerTimeOut
+ case object RequestWebUIPort
+
+ case class WebUIPortResponse(boundedPort: Int) {}
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 78e3747ad8..10161c8204 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
- val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
+ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7cf0a7754f..50d5900ecd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -26,6 +26,8 @@ import akka.actor._
import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
import akka.util.duration._
+import akka.dispatch.Await
+import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -33,6 +35,8 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
+import akka.util.Timeout
+import java.util.concurrent.TimeUnit
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
+
+ case RequestWebUIPort => {
+ sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1))
+ }
}
/**
@@ -364,7 +372,7 @@ private[spark] object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
- val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
actorSystem.awaitTermination()
}
@@ -378,9 +386,12 @@ private[spark] object Master {
}
}
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
- (actorSystem, boundPort)
+ implicit val timeout = Timeout(1 seconds)
+ val respFuture = actor ? RequestWebUIPort
+ val resp = Await.result(respFuture, timeout.duration).asInstanceOf[WebUIPortResponse]
+ (actorSystem, boundPort, resp.boundedPort)
}
}