From cdaa0fad51c7ad6c2a56f6c14faedd08fe341b2e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 1 Dec 2012 18:18:15 -0800 Subject: Use external addresses in standalone WebUI on EC2. --- bin/start-master.sh | 11 ++++++++++- bin/start-slave.sh | 15 +++++++++++++++ bin/start-slaves.sh | 5 +++-- core/src/main/scala/spark/deploy/DeployMessage.scala | 11 +++++++++-- core/src/main/scala/spark/deploy/master/Master.scala | 16 +++++++++++----- core/src/main/scala/spark/deploy/master/WorkerInfo.scala | 7 ++++--- core/src/main/scala/spark/deploy/worker/Worker.scala | 6 +++++- .../main/twirl/spark/deploy/master/worker_row.scala.html | 2 +- 8 files changed, 58 insertions(+), 15 deletions(-) create mode 100755 bin/start-slave.sh diff --git a/bin/start-master.sh b/bin/start-master.sh index 6403c944a4..ad19d48331 100755 --- a/bin/start-master.sh +++ b/bin/start-master.sh @@ -7,4 +7,13 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" -"$bin"/spark-daemon.sh start spark.deploy.master.Master \ No newline at end of file +# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves +if [ "$SPARK_PUBLIC_DNS" = "" ]; then + # If we appear to be running on EC2, use the public address by default: + if [[ `hostname` == *ec2.internal ]]; then + echo "RUNNING ON EC2" + export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname` + fi +fi + +"$bin"/spark-daemon.sh start spark.deploy.master.Master diff --git a/bin/start-slave.sh b/bin/start-slave.sh new file mode 100755 index 0000000000..10cce9c17b --- /dev/null +++ b/bin/start-slave.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI +if [ "$SPARK_PUBLIC_DNS" = "" ]; then + # If we appear to be running on EC2, use the public address by default: + if [[ `hostname` == *ec2.internal ]]; then + echo "RUNNING ON EC2" + export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname` + fi +fi + +"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1 diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh index 67b07215a2..390247ca4a 100755 --- a/bin/start-slaves.sh +++ b/bin/start-slaves.sh @@ -18,6 +18,7 @@ if [ "$SPARK_MASTER_IP" = "" ]; then SPARK_MASTER_IP=`hostname` fi -echo "Master IP: $ip" +echo "Master IP: $SPARK_MASTER_IP" -"$bin"/spark-daemons.sh start spark.deploy.worker.Worker spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT +# Launch the slaves +exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 7a1089c816..f05413a53b 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable // Worker to Master -private[spark] -case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int) +private[spark] +case class RegisterWorker( + id: String, + host: String, + port: Int, + cores: Int, + memory: Int, + webUiPort: Int, + publicAddress: String) extends DeployMessage private[spark] diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 7e5cd6b171..31fb83f2e2 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor val waitingJobs = new ArrayBuffer[JobInfo] val completedJobs = new ArrayBuffer[JobInfo] + val masterPublicAddress = { + val envVar = System.getenv("SPARK_PUBLIC_DNS") + if (envVar != null) envVar else ip + } + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each job // among all the nodes) instead of trying to consolidate each job onto a small # of nodes. @@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } override def receive = { - case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => { + case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.memoryMegabytesToString(memory))) if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { - addWorker(id, host, workerPort, cores, memory, worker_webUiPort) + addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredWorker("http://" + ip + ":" + webUiPort) + sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort) schedule() } } @@ -196,8 +201,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) } - def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = { - val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort) + def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, + publicAddress: String): WorkerInfo = { + val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) workers += worker idToWorker(worker.id) = worker actorToWorker(sender) = worker diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala index 706b1453aa..a0a698ef04 100644 --- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -10,7 +10,8 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val actor: ActorRef, - val webUiPort: Int) { + val webUiPort: Int, + val publicAddress: String) { var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info @@ -37,8 +38,8 @@ private[spark] class WorkerInfo( def hasExecutor(job: JobInfo): Boolean = { executors.values.exists(_.job == job) } - + def webUiAddress : String = { - "http://" + this.host + ":" + this.webUiPort + "http://" + this.publicAddress + ":" + this.webUiPort } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 67d41dda29..31b8f0f955 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -36,6 +36,10 @@ private[spark] class Worker( var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] + val publicAddress = { + val envVar = System.getenv("SPARK_PUBLIC_DNS") + if (envVar != null) envVar else ip + } var coresUsed = 0 var memoryUsed = 0 @@ -79,7 +83,7 @@ private[spark] class Worker( val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) try { master = context.actorFor(akkaUrl) - master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort) + master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } catch { diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html index 3dcba3a545..c32ab30401 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html @@ -4,7 +4,7 @@ - @worker.id + @worker.id @{worker.host}:@{worker.port} @worker.cores (@worker.coresUsed Used) -- cgit v1.2.3