aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-04 21:13:25 -0700
committerDenny <dennybritz@gmail.com>2012-09-04 21:13:25 -0700
commit1588d4dbe6574aa293df0b087cf2609575e187b7 (patch)
tree44164f11fa8bdfd2ca1ecd761113a6ae70df6905
parent22dde6e020a5770b7a944d1eb275a0c96cf4eea9 (diff)
downloadspark-1588d4dbe6574aa293df0b087cf2609575e187b7.tar.gz
spark-1588d4dbe6574aa293df0b087cf2609575e187b7.tar.bz2
spark-1588d4dbe6574aa293df0b087cf2609575e187b7.zip
Renamed class.
-rw-r--r--core/src/main/scala/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala59
2 files changed, 63 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 92c0dcc876..e24f4be5a4 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
+import spark.deploy.LocalSparkCluster
+
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
@@ -104,8 +106,8 @@ class SparkContext(
case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
val scheduler = new ClusterScheduler(this)
- val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt,
- coresPerSlave.toInt, memoryPerlave.toInt)
+ val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
+ val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
new file mode 100644
index 0000000000..85a7b65c38
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -0,0 +1,59 @@
+package spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+
+import spark.deploy.worker.Worker
+import spark.deploy.master.Master
+import spark.util.AkkaUtils
+import spark.{Logging, Utils}
+
+import scala.collection.mutable.ArrayBuffer
+
+class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
+ memoryPerSlave : Int) extends Logging {
+
+ val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
+ val localIpAddress = Utils.localIpAddress
+
+ var masterActor : ActorRef = _
+ var masterPort : Int = _
+ var masterUrl : String = _
+
+ val slaveActors = ArrayBuffer[ActorRef]()
+
+ def start() : String = {
+
+ logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
+
+ /* Start the Master */
+ val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
+ masterUrl = "spark://" + localIpAddress + ":" + masterPort
+ threadPool.execute(new Runnable {
+ def run() {
+ val actor = actorSystem.actorOf(
+ Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
+ masterActor = actor
+ actorSystem.awaitTermination()
+ }
+ })
+
+ /* Start the Slaves */
+ (1 to numSlaves + 1).foreach { slaveNum =>
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+ threadPool.execute(new Runnable {
+ def run() {
+ val actor = actorSystem.actorOf(
+ Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
+ name = "Worker")
+ slaveActors += actor
+ actorSystem.awaitTermination()
+ }
+ })
+ }
+
+ return masterUrl
+ }
+
+
+} \ No newline at end of file