aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-01 18:18:15 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-01 18:19:13 -0800
commitcdaa0fad51c7ad6c2a56f6c14faedd08fe341b2e (patch)
treee2cdaf661df2e1b67c377696de1aa7c0d6359be6
parent8d3713c22180e9a7f190549c2b94ac058c07130a (diff)
downloadspark-cdaa0fad51c7ad6c2a56f6c14faedd08fe341b2e.tar.gz
spark-cdaa0fad51c7ad6c2a56f6c14faedd08fe341b2e.tar.bz2
spark-cdaa0fad51c7ad6c2a56f6c14faedd08fe341b2e.zip
Use external addresses in standalone WebUI on EC2.
-rwxr-xr-xbin/start-master.sh11
-rwxr-xr-xbin/start-slave.sh15
-rwxr-xr-xbin/start-slaves.sh5
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala16
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html2
8 files changed, 58 insertions, 15 deletions
diff --git a/bin/start-master.sh b/bin/start-master.sh
index 6403c944a4..ad19d48331 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -7,4 +7,13 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
-"$bin"/spark-daemon.sh start spark.deploy.master.Master \ No newline at end of file
+# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
+if [ "$SPARK_PUBLIC_DNS" = "" ]; then
+ # If we appear to be running on EC2, use the public address by default:
+ if [[ `hostname` == *ec2.internal ]]; then
+ echo "RUNNING ON EC2"
+ export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
+ fi
+fi
+
+"$bin"/spark-daemon.sh start spark.deploy.master.Master
diff --git a/bin/start-slave.sh b/bin/start-slave.sh
new file mode 100755
index 0000000000..10cce9c17b
--- /dev/null
+++ b/bin/start-slave.sh
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI
+if [ "$SPARK_PUBLIC_DNS" = "" ]; then
+ # If we appear to be running on EC2, use the public address by default:
+ if [[ `hostname` == *ec2.internal ]]; then
+ echo "RUNNING ON EC2"
+ export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
+ fi
+fi
+
+"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh
index 67b07215a2..390247ca4a 100755
--- a/bin/start-slaves.sh
+++ b/bin/start-slaves.sh
@@ -18,6 +18,7 @@ if [ "$SPARK_MASTER_IP" = "" ]; then
SPARK_MASTER_IP=`hostname`
fi
-echo "Master IP: $ip"
+echo "Master IP: $SPARK_MASTER_IP"
-"$bin"/spark-daemons.sh start spark.deploy.worker.Worker spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+# Launch the slaves
+exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 7a1089c816..f05413a53b 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
-private[spark]
-case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
+private[spark]
+case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
extends DeployMessage
private[spark]
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 7e5cd6b171..31fb83f2e2 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ val masterPublicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else ip
+ }
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
@@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => {
+ case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort)
+ addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + ip + ":" + webUiPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
schedule()
}
}
@@ -196,8 +201,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort)
+ def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
+ publicAddress: String): WorkerInfo = {
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 706b1453aa..a0a698ef04 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -10,7 +10,8 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
- val webUiPort: Int) {
+ val webUiPort: Int,
+ val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
@@ -37,8 +38,8 @@ private[spark] class WorkerInfo(
def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
}
-
+
def webUiAddress : String = {
- "http://" + this.host + ":" + this.webUiPort
+ "http://" + this.publicAddress + ":" + this.webUiPort
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 67d41dda29..31b8f0f955 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -36,6 +36,10 @@ private[spark] class Worker(
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
+ val publicAddress = {
+ val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ if (envVar != null) envVar else ip
+ }
var coresUsed = 0
var memoryUsed = 0
@@ -79,7 +83,7 @@ private[spark] class Worker(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
- master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort)
+ master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index 3dcba3a545..c32ab30401 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -4,7 +4,7 @@
<tr>
<td>
- <a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
+ <a href="@worker.webUiAddress">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td>