diff options
Diffstat (limited to 'core/src/main/scala/spark/deploy/LocalSparkCluster.scala')
-rw-r--r-- | core/src/main/scala/spark/deploy/LocalSparkCluster.scala | 27 |
1 files changed, 13 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 8f51051e39..2836574ecb 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -16,7 +16,7 @@ import scala.collection.mutable.ArrayBuffer * fault recovery without spinning up a lot of processes. */ private[spark] -class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { +class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { val localIpAddress = Utils.localIpAddress @@ -25,29 +25,28 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) var masterPort : Int = _ var masterUrl : String = _ - val slaveActorSystems = ArrayBuffer[ActorSystem]() - val slaveActors = ArrayBuffer[ActorRef]() + val workerActorSystems = ArrayBuffer[ActorSystem]() + val workerActors = ArrayBuffer[ActorRef]() def start() : String = { - logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.") + logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) masterActorSystem = actorSystem masterUrl = "spark://" + localIpAddress + ":" + masterPort - val actor = masterActorSystem.actorOf( + masterActor = masterActorSystem.actorOf( Props(new Master(localIpAddress, masterPort, 0)), name = "Master") - masterActor = actor /* Start the Slaves */ - for (slaveNum <- 1 to numSlaves) { + for (workerNum <- 1 to numWorkers) { val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) - slaveActorSystems += actorSystem + AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0) + workerActorSystems += actorSystem val actor = actorSystem.actorOf( - Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)), + Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)), name = "Worker") - slaveActors += actor + workerActors += actor } return masterUrl @@ -55,9 +54,9 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) def stop() { logInfo("Shutting down local Spark cluster.") - // Stop the slaves before the master so they don't get upset that it disconnected - slaveActorSystems.foreach(_.shutdown()) - slaveActorSystems.foreach(_.awaitTermination()) + // Stop the workers before the master so they don't get upset that it disconnected + workerActorSystems.foreach(_.shutdown()) + workerActorSystems.foreach(_.awaitTermination()) masterActorSystem.shutdown() masterActorSystem.awaitTermination() } |