diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-30 16:43:27 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-30 16:45:57 -0700 |
commit | 408b5a13321fe303a358fdef02bba042d12c77b4 (patch) | |
tree | 7228370a5a20c4466661d688f157beb2bdef6a62 | |
parent | 2fb6e7d71ebfe1ba1075a107dbc8e32ba58bcacc (diff) | |
download | spark-408b5a13321fe303a358fdef02bba042d12c77b4.tar.gz spark-408b5a13321fe303a358fdef02bba042d12c77b4.tar.bz2 spark-408b5a13321fe303a358fdef02bba042d12c77b4.zip |
More work on deploy code (adding Worker class)
18 files changed, 351 insertions, 127 deletions
diff --git a/core/src/main/resources/spark/deploy/master/webui/index.html b/core/src/main/resources/spark/deploy/master/webui/index.html new file mode 100644 index 0000000000..c11101045e --- /dev/null +++ b/core/src/main/resources/spark/deploy/master/webui/index.html @@ -0,0 +1,6 @@ +<html> +<head><title>Hello world!</title></head> +<body> +<p>Hello world!</p> +</body> +</html>
\ No newline at end of file diff --git a/core/src/main/resources/spark/deploy/worker/webui/index.html b/core/src/main/resources/spark/deploy/worker/webui/index.html new file mode 100644 index 0000000000..c11101045e --- /dev/null +++ b/core/src/main/resources/spark/deploy/worker/webui/index.html @@ -0,0 +1,6 @@ +<html> +<head><title>Hello world!</title></head> +<body> +<p>Hello world!</p> +</body> +</html>
\ No newline at end of file diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 44dca2e4f1..95c833ea5b 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -200,9 +200,27 @@ object Utils { } /** - * Use unit suffixes (Byte, Kilobyte, Megabyte, Gigabyte, Terabyte and - * Petabyte) in order to reduce the number of digits to four or less. For - * example, 4,000,000 is returned as 4MB. + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. + * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM + * environment variable. + */ + def memoryStringToMb(str: String): Int = { + val lower = str.toLowerCase + if (lower.endsWith("k")) { + (lower.substring(0, lower.length-1).toLong / 1024).toInt + } else if (lower.endsWith("m")) { + lower.substring(0, lower.length-1).toInt + } else if (lower.endsWith("g")) { + lower.substring(0, lower.length-1).toInt * 1024 + } else if (lower.endsWith("t")) { + lower.substring(0, lower.length-1).toInt * 1024 * 1024 + } else {// no suffix, so it's just a number in bytes + (lower.toLong / 1024 / 1024).toInt + } + } + + /** + * Convert a memory quantity in bytes to a human-readable string such as "4.0 MB". */ def memoryBytesToString(size: Long): String = { val TB = 1L << 40 @@ -225,4 +243,11 @@ object Utils { } "%.1f %s".formatLocal(Locale.US, value, unit) } + + /** + * Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB". + */ + def memoryMegabytesToString(megabytes: Long): String = { + memoryBytesToString(megabytes * 1024L * 1024L) + } } diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala new file mode 100644 index 0000000000..4e641157e1 --- /dev/null +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -0,0 +1,6 @@ +package spark.deploy + +sealed trait DeployMessage + +case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends DeployMessage +case class RegisteredSlave(clusterId: String, slaveId: Int) extends DeployMessage diff --git a/core/src/main/scala/spark/deploy/Master.scala b/core/src/main/scala/spark/deploy/Master.scala deleted file mode 100644 index da2f678f52..0000000000 --- a/core/src/main/scala/spark/deploy/Master.scala +++ /dev/null @@ -1,44 +0,0 @@ -package spark.deploy - -import akka.actor.{ActorRef, Props, Actor, ActorSystem} -import spark.{Logging, Utils} -import scala.collection.immutable.{::, Nil} -import spark.util.{AkkaUtils, IntParam} -import cc.spray.Directives - -sealed trait MasterMessage -case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends MasterMessage - -class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { - override def preStart() { - logInfo("Starting Spark master at spark://" + ip + ":" + port) - startWebUi() - } - - def startWebUi() { - val webUi = new MasterWebUI(context.system, self) - try { - AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler) - } catch { - case e: Exception => - logError("Failed to create web UI", e) - System.exit(1) - } - } - - override def receive = { - case RegisterSlave(host, slavePort, cores, memory) => - logInfo("Registering slave %s:%d with %d cores, %s RAM".format( - host, slavePort, cores, Utils.memoryBytesToString(memory * 1024L))) - } -} - -object Master { - def main(args: Array[String]) { - val params = new MasterArguments(args) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", params.ip, params.port) - val actor = actorSystem.actorOf( - Props(new Master(params.ip, boundPort, params.webUiPort)), name = "Master") - actorSystem.awaitTermination() - } -} diff --git a/core/src/main/scala/spark/deploy/MasterWebUI.scala b/core/src/main/scala/spark/deploy/MasterWebUI.scala deleted file mode 100644 index 3f078322e1..0000000000 --- a/core/src/main/scala/spark/deploy/MasterWebUI.scala +++ /dev/null @@ -1,12 +0,0 @@ -package spark.deploy - -import akka.actor.{ActorRef, ActorSystem} -import cc.spray.Directives - -class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { - val handler = { - path("") { - get { _.complete("Hello world!") } - } - } -} diff --git a/core/src/main/scala/spark/deploy/Worker.scala b/core/src/main/scala/spark/deploy/Worker.scala deleted file mode 100644 index 7210a4b902..0000000000 --- a/core/src/main/scala/spark/deploy/Worker.scala +++ /dev/null @@ -1,10 +0,0 @@ -package spark.deploy - -class Worker(cores: Int, memoryMb: Int) { - -} - -object Worker { - def main(args: Array[String]) { - } -}
\ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala new file mode 100644 index 0000000000..8fdecfa08c --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -0,0 +1,94 @@ +package spark.deploy.master + +import scala.collection.mutable.HashMap + +import akka.actor.{Terminated, ActorRef, Props, Actor} +import spark.{Logging, Utils} +import spark.util.AkkaUtils +import java.text.SimpleDateFormat +import java.util.Date +import spark.deploy.{RegisteredSlave, RegisterSlave} + +class SlaveInfo( + val id: Int, + val host: String, + val port: Int, + val cores: Int, + val memory: Int, + val actor: ActorRef) { + var coresUsed = 0 + var memoryUsed = 0 + + def coresFree: Int = cores - coresUsed + + def memoryFree: Int = memory - memoryUsed +} + +class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { + val clusterId = newClusterId() + var nextSlaveId = 0 + var nextJobId = 0 + val slaves = new HashMap[Int, SlaveInfo] + val actorToSlave = new HashMap[ActorRef, SlaveInfo] + + override def preStart() { + logInfo("Starting Spark master at spark://" + ip + ":" + port) + logInfo("Cluster ID: " + clusterId) + startWebUi() + } + + def startWebUi() { + val webUi = new MasterWebUI(context.system, self) + try { + AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler) + } catch { + case e: Exception => + logError("Failed to create web UI", e) + System.exit(1) + } + } + + override def receive = { + case RegisterSlave(host, slavePort, cores, memory) => { + logInfo("Registering slave %s:%d with %d cores, %s RAM".format( + host, slavePort, cores, Utils.memoryMegabytesToString(memory))) + val id = newSlaveId() + slaves(id) = new SlaveInfo(id, host, slavePort, cores, memory, sender) + actorToSlave(sender) = slaves(id) + context.watch(sender) + sender ! RegisteredSlave(clusterId, id) + } + + case Terminated(actor) => { + logInfo("Slave disconnected: " + actor) + actorToSlave.get(actor) match { + case Some(slave) => + logInfo("Removing slave " + slave.id) + slaves -= slave.id + actorToSlave -= actor + case None => + logError("Did not have any slave registered for " + actor) + } + } + } + + def newClusterId(): String = { + val date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()) + "%s-%04d".format(date, (math.random * 10000).toInt) + } + + def newSlaveId(): Int = { + nextSlaveId += 1 + nextSlaveId - 1 + } +} + +object Master { + def main(argStrings: Array[String]) { + val args = new MasterArguments(argStrings) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) + val actor = actorSystem.actorOf( + Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master") + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/spark/deploy/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala index c948a405ef..ca4b8a143f 100644 --- a/core/src/main/scala/spark/deploy/MasterArguments.scala +++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala @@ -1,4 +1,4 @@ -package spark.deploy +package spark.deploy.master import spark.util.IntParam import spark.Utils @@ -7,9 +7,9 @@ import spark.Utils * Command-line parser for the master. */ class MasterArguments(args: Array[String]) { - var ip: String = Utils.localIpAddress() - var port: Int = 7077 - var webUiPort: Int = 8080 + var ip = Utils.localIpAddress() + var port = 7077 + var webUiPort = 8080 parse(args.toList) @@ -45,7 +45,7 @@ class MasterArguments(args: Array[String]) { "Options:\n" + " -i IP, --ip IP IP address or DNS name to listen on\n" + " -p PORT, --port PORT Port to listen on (default: 7077)\n" + - " --webui-port PORT Port for web UI (default: 8080)\n") + " --webui-port PORT Port for web UI (default: 8080)") System.exit(exitCode) } }
\ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala new file mode 100644 index 0000000000..b0c871dd7b --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -0,0 +1,17 @@ +package spark.deploy.master + +import akka.actor.{ActorRef, ActorSystem} +import cc.spray.Directives + +class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { + val RESOURCE_DIR = "spark/deploy/master/webui" + + val handler = { + get { + path("") { + getFromResource(RESOURCE_DIR + "/index.html") + } ~ + getFromResourceDirectory(RESOURCE_DIR) + } + } +} diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala new file mode 100644 index 0000000000..fd49223798 --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -0,0 +1,58 @@ +package spark.deploy.worker + +import scala.collection.mutable.HashMap + +import akka.actor.{Terminated, ActorRef, Props, Actor} +import spark.{Logging, Utils} +import spark.util.AkkaUtils +import java.text.SimpleDateFormat +import java.util.Date +import spark.deploy.{RegisteredSlave, RegisterSlave} + +class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int) + extends Actor with Logging { + + var coresUsed = 0 + var memoryUsed = 0 + + def coresFree: Int = cores - coresUsed + def memoryFree: Int = memory - memoryUsed + + override def preStart() { + logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( + ip, port, cores, Utils.memoryMegabytesToString(memory))) + startWebUi() + } + + def startWebUi() { + val webUi = new WorkerWebUI(context.system, self) + try { + AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler) + } catch { + case e: Exception => + logError("Failed to create web UI", e) + System.exit(1) + } + } + + override def receive = { + case RegisteredSlave(clusterId, slaveId) => { + logInfo("Registered with cluster ID " + clusterId + ", slave ID " + slaveId) + } + + case Terminated(actor) => { + logError("Master disconnected!") + } + } +} + +object Worker { + def main(argStrings: Array[String]) { + val args = new WorkerArguments(argStrings) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) + val actor = actorSystem.actorOf( + Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory)), + name = "Worker") + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala new file mode 100644 index 0000000000..cd112b7fa3 --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala @@ -0,0 +1,75 @@ +package spark.deploy.worker + +import spark.util.IntParam +import spark.util.MemoryParam +import spark.Utils +import java.lang.management.ManagementFactory + +/** + * Command-line parser for the master. + */ +class WorkerArguments(args: Array[String]) { + var ip = Utils.localIpAddress() + var port = 0 + var webUiPort = 8081 + var cores = inferDefaultCores() + var memory = inferDefaultMemory() + + parse(args.toList) + + def parse(args: List[String]): Unit = args match { + case ("--ip" | "-i") :: value :: tail => + ip = value + parse(tail) + + case ("--port" | "-p") :: IntParam(value) :: tail => + port = value + parse(tail) + + case ("--cores" | "-c") :: IntParam(value) :: tail => + cores = value + parse(tail) + + case ("--memory" | "-m") :: MemoryParam(value) :: tail => + memory = value + parse(tail) + + case "--webui-port" :: IntParam(value) :: tail => + webUiPort = value + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => {} + + case _ => + printUsageAndExit(1) + } + + /** + * Print usage and exit JVM with the given exit code. + */ + def printUsageAndExit(exitCode: Int) { + System.err.println( + "Usage: spark-worker [options]\n" + + "\n" + + "Options:\n" + + " -c CORES, --cores CORES Number of cores to use\n" + + " -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)\n" + + " -i IP, --ip IP IP address or DNS name to listen on\n" + + " -p PORT, --port PORT Port to listen on (default: random)\n" + + " --webui-port PORT Port for web UI (default: 8081)") + System.exit(exitCode) + } + + def inferDefaultCores(): Int = { + Runtime.getRuntime.availableProcessors() + } + + def inferDefaultMemory(): Int = { + val bean = ManagementFactory.getOperatingSystemMXBean + .asInstanceOf[com.sun.management.OperatingSystemMXBean] + (bean.getTotalPhysicalMemorySize / 1024 / 1024).toInt + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala new file mode 100644 index 0000000000..efd3822e61 --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -0,0 +1,17 @@ +package spark.deploy.worker + +import akka.actor.{ActorRef, ActorSystem} +import cc.spray.Directives + +class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { + val RESOURCE_DIR = "spark/deploy/worker/webui" + + val handler = { + get { + path("") { + getFromResource(RESOURCE_DIR + "/index.html") + } ~ + getFromResourceDirectory(RESOURCE_DIR) + } + } +} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala index 8e34537674..9113348976 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala @@ -45,7 +45,7 @@ class MesosScheduler( // Memory used by each executor (in megabytes) val EXECUTOR_MEMORY = { if (System.getenv("SPARK_MEM") != null) { - MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM")) + Utils.memoryStringToMb(System.getenv("SPARK_MEM")) // TODO: Might need to add some extra memory for the non-heap parts of the JVM } else { 512 @@ -467,25 +467,3 @@ class MesosScheduler( driver.reviveOffers() } } - -object MesosScheduler { - /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. - * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM - * environment variable. - */ - def memoryStringToMb(str: String): Int = { - val lower = str.toLowerCase - if (lower.endsWith("k")) { - (lower.substring(0, lower.length-1).toLong / 1024).toInt - } else if (lower.endsWith("m")) { - lower.substring(0, lower.length-1).toInt - } else if (lower.endsWith("g")) { - lower.substring(0, lower.length-1).toInt * 1024 - } else if (lower.endsWith("t")) { - lower.substring(0, lower.length-1).toInt * 1024 * 1024 - } else {// no suffix, so it's just a number in bytes - (lower.toLong / 1024 / 1024).toInt - } - } -} diff --git a/core/src/main/scala/spark/util/MemoryParam.scala b/core/src/main/scala/spark/util/MemoryParam.scala new file mode 100644 index 0000000000..4fba914afe --- /dev/null +++ b/core/src/main/scala/spark/util/MemoryParam.scala @@ -0,0 +1,17 @@ +package spark.util + +import spark.Utils + +/** + * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing + * the number of megabytes. Supports the same formats as Utils.memoryStringToMb. + */ +object MemoryParam { + def unapply(str: String): Option[Int] = { + try { + Some(Utils.memoryStringToMb(str)) + } catch { + case e: NumberFormatException => None + } + } +} diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala deleted file mode 100644 index 54421225d8..0000000000 --- a/core/src/test/scala/spark/MesosSchedulerSuite.scala +++ /dev/null @@ -1,30 +0,0 @@ -package spark - -import org.scalatest.FunSuite - -import spark.scheduler.mesos.MesosScheduler - -class MesosSchedulerSuite extends FunSuite { - test("memoryStringToMb"){ - - assert(MesosScheduler.memoryStringToMb("1") == 0) - assert(MesosScheduler.memoryStringToMb("1048575") == 0) - assert(MesosScheduler.memoryStringToMb("3145728") == 3) - - assert(MesosScheduler.memoryStringToMb("1024k") == 1) - assert(MesosScheduler.memoryStringToMb("5000k") == 4) - assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K")) - - assert(MesosScheduler.memoryStringToMb("1024m") == 1024) - assert(MesosScheduler.memoryStringToMb("5000m") == 5000) - assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M")) - - assert(MesosScheduler.memoryStringToMb("2g") == 2048) - assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G")) - - assert(MesosScheduler.memoryStringToMb("2t") == 2097152) - assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T")) - - - } -} diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala index a46d90223e..ed4701574f 100644 --- a/core/src/test/scala/spark/UtilsSuite.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -26,5 +26,25 @@ class UtilsSuite extends FunSuite { assert(os.toByteArray.toList.equals(bytes.toList)) } + + test("memoryStringToMb"){ + assert(Utils.memoryStringToMb("1") == 0) + assert(Utils.memoryStringToMb("1048575") == 0) + assert(Utils.memoryStringToMb("3145728") == 3) + + assert(Utils.memoryStringToMb("1024k") == 1) + assert(Utils.memoryStringToMb("5000k") == 4) + assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K")) + + assert(Utils.memoryStringToMb("1024m") == 1024) + assert(Utils.memoryStringToMb("5000m") == 5000) + assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M")) + + assert(Utils.memoryStringToMb("2g") == 2048) + assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G")) + + assert(Utils.memoryStringToMb("2t") == 2097152) + assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T")) + } } @@ -46,6 +46,7 @@ CLASSPATH="$SPARK_CLASSPATH" CLASSPATH+=":$MESOS_CLASSPATH" CLASSPATH+=":$FWDIR/conf" CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH+=":$CORE_DIR/src/main/resources" CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $CORE_DIR/lib -name '*jar'`; do |