From 22dde6e020a5770b7a944d1eb275a0c96cf4eea9 Mon Sep 17 00:00:00 2001 From: Denny Date: Tue, 4 Sep 2012 20:56:30 -0700 Subject: Start a standalone cluster locally. --- core/src/main/scala/spark/SparkContext.scala | 12 ++++- core/src/main/scala/spark/deploy/DeployUtils.scala | 53 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/spark/deploy/DeployUtils.scala diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 538e057926..92c0dcc876 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -81,9 +81,11 @@ class SparkContext( val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val SPARK_LOCALCLUSTER_REGEX = """spark-cluster\[([0-9]+)\,([0-9]+),([0-9]+)]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """(spark://.*)""".r - + master match { case "local" => new LocalScheduler(1, 0) @@ -99,6 +101,14 @@ class SparkContext( val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) scheduler.initialize(backend) scheduler + + case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) => + val scheduler = new ClusterScheduler(this) + val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt, + coresPerSlave.toInt, memoryPerlave.toInt) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + scheduler.initialize(backend) + scheduler case _ => MesosNativeLibrary.load() diff --git a/core/src/main/scala/spark/deploy/DeployUtils.scala b/core/src/main/scala/spark/deploy/DeployUtils.scala new file mode 100644 index 0000000000..602ba6ecb0 --- /dev/null +++ b/core/src/main/scala/spark/deploy/DeployUtils.scala @@ -0,0 +1,53 @@ +package spark.deploy + +import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} + +import spark.deploy.worker.Worker +import spark.deploy.master.Master +import spark.util.AkkaUtils +import spark.{Logging, Utils} + +import scala.collection.mutable.ArrayBuffer + +object DeployUtils extends Logging { + + /* Starts a local standalone Spark cluster with a specified number of slaves */ + def startLocalSparkCluster(numSlaves : Int, coresPerSlave : Int, + memoryPerSlave : Int) : String = { + + logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.") + + val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1) + val localIpAddress = Utils.localIpAddress + val workers = ArrayBuffer[ActorRef]() + + /* Start the Master */ + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) + val masterUrl = "spark://" + localIpAddress + ":" + boundPort + threadPool.execute(new Runnable { + def run() { + val actor = actorSystem.actorOf( + Props(new Master(localIpAddress, boundPort, 8080)), name = "Master") + actorSystem.awaitTermination() + } + }) + + /* Start the Slaves */ + (1 to numSlaves + 1).foreach { slaveNum => + val (actorSystem, boundPort) = + AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) + threadPool.execute(new Runnable { + def run() { + val actor = actorSystem.actorOf( + Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)), + name = "Worker") + workers += actor + actorSystem.awaitTermination() + } + }) + } + + return masterUrl + } + +} \ No newline at end of file -- cgit v1.2.3