aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-09-03 13:01:17 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-09-03 13:01:17 -0700
commit68df2464d1146f0eb3115859b44b7f3c5112505e (patch)
tree5132414211fb642fd7fce546181eb6235a863b4d /core
parent59218bdd4996a13116009e3669b1b875be23a694 (diff)
parentb25918d8412eb314587e03b727dd1d6c99947669 (diff)
downloadspark-68df2464d1146f0eb3115859b44b7f3c5112505e.tar.gz
spark-68df2464d1146f0eb3115859b44b7f3c5112505e.tar.bz2
spark-68df2464d1146f0eb3115859b44b7f3c5112505e.zip
Merge pull request #889 from alig/master
Return the port the WebUI is bound to (useful if port 0 was used)
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala21
3 files changed, 22 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c31619db27..1cfff5e565 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -127,4 +127,8 @@ private[deploy] object DeployMessages {
case object CheckForWorkerTimeOut
+ case object RequestWebUIPort
+
+ case class WebUIPortResponse(webUIBoundPort: Int)
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 78e3747ad8..10161c8204 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
- val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
+ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7cf0a7754f..bde59905bc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,15 +17,18 @@
package org.apache.spark.deploy.master
-import java.text.SimpleDateFormat
import java.util.Date
+import java.text.SimpleDateFormat
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
import akka.actor.Terminated
+import akka.dispatch.Await
+import akka.pattern.ask
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
import akka.util.duration._
+import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -33,6 +36,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
+import akka.util.{Duration, Timeout}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
+
+ case RequestWebUIPort => {
+ sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1))
+ }
}
/**
@@ -364,7 +372,7 @@ private[spark] object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
- val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
actorSystem.awaitTermination()
}
@@ -378,9 +386,14 @@ private[spark] object Master {
}
}
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
- (actorSystem, boundPort)
+ val timeoutDuration = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ implicit val timeout = Timeout(timeoutDuration)
+ val respFuture = actor ? RequestWebUIPort // ask pattern
+ val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
+ (actorSystem, boundPort, resp.webUIBoundPort)
}
}