aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-30 14:45:55 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-30 14:45:55 -0700
commit2fb6e7d71ebfe1ba1075a107dbc8e32ba58bcacc (patch)
treec7f374b27d59c6a084de627f12adf674e45bb40e
parentc53670b9bf709ab583cbc75e952026bc4abb6c5f (diff)
downloadspark-2fb6e7d71ebfe1ba1075a107dbc8e32ba58bcacc.tar.gz
spark-2fb6e7d71ebfe1ba1075a107dbc8e32ba58bcacc.tar.bz2
spark-2fb6e7d71ebfe1ba1075a107dbc8e32ba58bcacc.zip
Initial framework to get a master and web UI up.
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala1
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala21
-rw-r--r--core/src/main/scala/spark/Utils.scala7
-rw-r--r--core/src/main/scala/spark/deploy/Master.scala44
-rw-r--r--core/src/main/scala/spark/deploy/MasterArguments.scala51
-rw-r--r--core/src/main/scala/spark/deploy/MasterWebUI.scala12
-rw-r--r--core/src/main/scala/spark/deploy/Worker.scala10
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala71
-rw-r--r--core/src/main/scala/spark/util/IntParam.scala14
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala13
-rw-r--r--project/SparkBuild.scala7
11 files changed, 222 insertions, 29 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 010203d1ca..06d2d09fce 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -15,7 +15,6 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
- val ser = SparkEnv.get.serializer.newInstance()
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 5dcf25f997..602fcca0f9 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -1,14 +1,11 @@
package spark
import akka.actor.ActorSystem
-import akka.actor.ActorSystemImpl
-import akka.remote.RemoteActorRefProvider
-
-import com.typesafe.config.ConfigFactory
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
+import spark.util.AkkaUtils
class SparkEnv (
val actorSystem: ActorSystem,
@@ -45,24 +42,12 @@ object SparkEnv {
isLocal: Boolean
) : SparkEnv = {
- val akkaConf = ConfigFactory.parseString("""
- akka.daemonic = on
- akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
- akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- """.format(hostname, port))
-
- val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.master.port to it.
- // Unfortunately Akka doesn't yet provide an API for this except if you cast objects as below.
if (isMaster && port == 0) {
- val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
- val port = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
- System.setProperty("spark.master.port", port.toString)
+ System.setProperty("spark.master.port", boundPort.toString)
}
val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer")
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 17670e077a..44dca2e4f1 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -205,12 +205,15 @@ object Utils {
* example, 4,000,000 is returned as 4MB.
*/
def memoryBytesToString(size: Long): String = {
+ val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
val (value, unit) = {
- if (size >= 2*GB) {
+ if (size >= 2*TB) {
+ (size.asInstanceOf[Double] / TB, "TB")
+ } else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB")
@@ -220,6 +223,6 @@ object Utils {
(size.asInstanceOf[Double], "B")
}
}
- "%.1f%s".formatLocal(Locale.US, value, unit)
+ "%.1f %s".formatLocal(Locale.US, value, unit)
}
}
diff --git a/core/src/main/scala/spark/deploy/Master.scala b/core/src/main/scala/spark/deploy/Master.scala
new file mode 100644
index 0000000000..da2f678f52
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/Master.scala
@@ -0,0 +1,44 @@
+package spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem}
+import spark.{Logging, Utils}
+import scala.collection.immutable.{::, Nil}
+import spark.util.{AkkaUtils, IntParam}
+import cc.spray.Directives
+
+sealed trait MasterMessage
+case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends MasterMessage
+
+class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
+ override def preStart() {
+ logInfo("Starting Spark master at spark://" + ip + ":" + port)
+ startWebUi()
+ }
+
+ def startWebUi() {
+ val webUi = new MasterWebUI(context.system, self)
+ try {
+ AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
+ } catch {
+ case e: Exception =>
+ logError("Failed to create web UI", e)
+ System.exit(1)
+ }
+ }
+
+ override def receive = {
+ case RegisterSlave(host, slavePort, cores, memory) =>
+ logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
+ host, slavePort, cores, Utils.memoryBytesToString(memory * 1024L)))
+ }
+}
+
+object Master {
+ def main(args: Array[String]) {
+ val params = new MasterArguments(args)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", params.ip, params.port)
+ val actor = actorSystem.actorOf(
+ Props(new Master(params.ip, boundPort, params.webUiPort)), name = "Master")
+ actorSystem.awaitTermination()
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/MasterArguments.scala b/core/src/main/scala/spark/deploy/MasterArguments.scala
new file mode 100644
index 0000000000..c948a405ef
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/MasterArguments.scala
@@ -0,0 +1,51 @@
+package spark.deploy
+
+import spark.util.IntParam
+import spark.Utils
+
+/**
+ * Command-line parser for the master.
+ */
+class MasterArguments(args: Array[String]) {
+ var ip: String = Utils.localIpAddress()
+ var port: Int = 7077
+ var webUiPort: Int = 8080
+
+ parse(args.toList)
+
+ def parse(args: List[String]): Unit = args match {
+ case ("--ip" | "-i") :: value :: tail =>
+ ip = value
+ parse(tail)
+
+ case ("--port" | "-p") :: IntParam(value) :: tail =>
+ port = value
+ parse(tail)
+
+ case "--webui-port" :: IntParam(value) :: tail =>
+ webUiPort = value
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil => {}
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+
+ /**
+ * Print usage and exit JVM with the given exit code.
+ */
+ def printUsageAndExit(exitCode: Int) {
+ System.err.println(
+ "Usage: spark-master [options]\n" +
+ "\n" +
+ "Options:\n" +
+ " -i IP, --ip IP IP address or DNS name to listen on\n" +
+ " -p PORT, --port PORT Port to listen on (default: 7077)\n" +
+ " --webui-port PORT Port for web UI (default: 8080)\n")
+ System.exit(exitCode)
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/MasterWebUI.scala b/core/src/main/scala/spark/deploy/MasterWebUI.scala
new file mode 100644
index 0000000000..3f078322e1
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/MasterWebUI.scala
@@ -0,0 +1,12 @@
+package spark.deploy
+
+import akka.actor.{ActorRef, ActorSystem}
+import cc.spray.Directives
+
+class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
+ val handler = {
+ path("") {
+ get { _.complete("Hello world!") }
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/Worker.scala b/core/src/main/scala/spark/deploy/Worker.scala
new file mode 100644
index 0000000000..7210a4b902
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/Worker.scala
@@ -0,0 +1,10 @@
+package spark.deploy
+
+class Worker(cores: Int, memoryMb: Int) {
+
+}
+
+object Worker {
+ def main(args: Array[String]) {
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
new file mode 100644
index 0000000000..84e942e5b7
--- /dev/null
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -0,0 +1,71 @@
+package spark.util
+
+import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import com.typesafe.config.ConfigFactory
+import akka.util.duration._
+import akka.pattern.ask
+import akka.remote.RemoteActorRefProvider
+import cc.spray.Route
+import cc.spray.io.IoWorker
+import cc.spray.{SprayCanRootService, HttpService}
+import cc.spray.can.server.HttpServer
+import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler
+import akka.dispatch.Await
+import spark.SparkException
+import java.util.concurrent.TimeoutException
+
+/**
+ * Various utility classes for working with Akka.
+ */
+object AkkaUtils {
+ /**
+ * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
+ * ActorSystem itself and its port (which is hard to get from Akka).
+ */
+ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
+ val akkaConf = ConfigFactory.parseString("""
+ akka.daemonic = on
+ akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+ akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
+ akka.remote.netty.hostname = "%s"
+ akka.remote.netty.port = %d
+ """.format(host, port))
+
+ val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
+
+ // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
+ // hack because Akka doesn't let you figure out the port through the public API yet.
+ val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
+ val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
+ return (actorSystem, boundPort)
+
+ return (null, 0)
+ }
+
+ /**
+ * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
+ * handle requests. Throws a SparkException if this fails.
+ */
+ def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
+ val ioWorker = new IoWorker(actorSystem).start()
+ val httpService = actorSystem.actorOf(Props(new HttpService(route)))
+ val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
+ val server = actorSystem.actorOf(
+ Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
+ actorSystem.registerOnTermination { ioWorker.stop() }
+ val timeout = 1.seconds
+ val future = server.ask(HttpServer.Bind(ip, port))(timeout)
+ try {
+ Await.result(future, timeout) match {
+ case bound: HttpServer.Bound =>
+ return
+ case other: Any =>
+ throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
+ }
+ } catch {
+ case e: TimeoutException =>
+ throw new SparkException("Failed to bind web UI to port " + port)
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/util/IntParam.scala b/core/src/main/scala/spark/util/IntParam.scala
new file mode 100644
index 0000000000..c3ff063569
--- /dev/null
+++ b/core/src/main/scala/spark/util/IntParam.scala
@@ -0,0 +1,14 @@
+package spark.util
+
+/**
+ * An extractor object for parsing strings into integers.
+ */
+object IntParam {
+ def unapply(str: String): Option[Int] = {
+ try {
+ Some(str.toInt)
+ } catch {
+ case e: NumberFormatException => None
+ }
+ }
+}
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index 1ac4737f04..a46d90223e 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -7,12 +7,13 @@ import scala.util.Random
class UtilsSuite extends FunSuite {
test("memoryBytesToString") {
- assert(Utils.memoryBytesToString(10) === "10.0B")
- assert(Utils.memoryBytesToString(1500) === "1500.0B")
- assert(Utils.memoryBytesToString(2000000) === "1953.1KB")
- assert(Utils.memoryBytesToString(2097152) === "2.0MB")
- assert(Utils.memoryBytesToString(2306867) === "2.2MB")
- assert(Utils.memoryBytesToString(5368709120L) === "5.0GB")
+ assert(Utils.memoryBytesToString(10) === "10.0 B")
+ assert(Utils.memoryBytesToString(1500) === "1500.0 B")
+ assert(Utils.memoryBytesToString(2000000) === "1953.1 KB")
+ assert(Utils.memoryBytesToString(2097152) === "2.0 MB")
+ assert(Utils.memoryBytesToString(2306867) === "2.2 MB")
+ assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB")
+ assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
}
test("copyStream") {
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index be5f202dbe..6f5df5c743 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -46,7 +46,8 @@ object SparkBuild extends Build {
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
- "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
+ "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/",
+ "Spray Repository" at "http://repo.spray.cc/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1",
@@ -63,7 +64,9 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.2",
"org.jboss.netty" % "netty" % "3.2.6.Final",
"it.unimi.dsi" % "fastutil" % "6.4.4",
- "colt" % "colt" % "1.2.0"
+ "colt" % "colt" % "1.2.0",
+ "cc.spray" % "spray-can" % "1.0-M2.1",
+ "cc.spray" % "spray-server" % "1.0-M2.1"
)
) ++ assemblySettings ++ extraAssemblySettings