aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-04 20:56:30 -0700
committerDenny <dennybritz@gmail.com>2012-09-04 20:56:30 -0700
commit22dde6e020a5770b7a944d1eb275a0c96cf4eea9 (patch)
tree7579f4b3ddc27f39f05272aae32cab274ff9efb6
parent47507d69d98b47685d667d7ec52dcc4116465eb1 (diff)
downloadspark-22dde6e020a5770b7a944d1eb275a0c96cf4eea9.tar.gz
spark-22dde6e020a5770b7a944d1eb275a0c96cf4eea9.tar.bz2
spark-22dde6e020a5770b7a944d1eb275a0c96cf4eea9.zip
Start a standalone cluster locally.
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/deploy/DeployUtils.scala53
2 files changed, 64 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 538e057926..92c0dcc876 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -81,9 +81,11 @@ class SparkContext(
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
+ val SPARK_LOCALCLUSTER_REGEX = """spark-cluster\[([0-9]+)\,([0-9]+),([0-9]+)]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
-
+
master match {
case "local" =>
new LocalScheduler(1, 0)
@@ -99,6 +101,14 @@ class SparkContext(
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
+
+ case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
+ val scheduler = new ClusterScheduler(this)
+ val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt,
+ coresPerSlave.toInt, memoryPerlave.toInt)
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
+ scheduler.initialize(backend)
+ scheduler
case _ =>
MesosNativeLibrary.load()
diff --git a/core/src/main/scala/spark/deploy/DeployUtils.scala b/core/src/main/scala/spark/deploy/DeployUtils.scala
new file mode 100644
index 0000000000..602ba6ecb0
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/DeployUtils.scala
@@ -0,0 +1,53 @@
+package spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+
+import spark.deploy.worker.Worker
+import spark.deploy.master.Master
+import spark.util.AkkaUtils
+import spark.{Logging, Utils}
+
+import scala.collection.mutable.ArrayBuffer
+
+object DeployUtils extends Logging {
+
+ /* Starts a local standalone Spark cluster with a specified number of slaves */
+ def startLocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
+ memoryPerSlave : Int) : String = {
+
+ logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
+
+ val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
+ val localIpAddress = Utils.localIpAddress
+ val workers = ArrayBuffer[ActorRef]()
+
+ /* Start the Master */
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
+ val masterUrl = "spark://" + localIpAddress + ":" + boundPort
+ threadPool.execute(new Runnable {
+ def run() {
+ val actor = actorSystem.actorOf(
+ Props(new Master(localIpAddress, boundPort, 8080)), name = "Master")
+ actorSystem.awaitTermination()
+ }
+ })
+
+ /* Start the Slaves */
+ (1 to numSlaves + 1).foreach { slaveNum =>
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+ threadPool.execute(new Runnable {
+ def run() {
+ val actor = actorSystem.actorOf(
+ Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
+ name = "Worker")
+ workers += actor
+ actorSystem.awaitTermination()
+ }
+ })
+ }
+
+ return masterUrl
+ }
+
+} \ No newline at end of file