aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/deploy/LocalSparkCluster.scala')
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala27
1 files changed, 13 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 8f51051e39..2836574ecb 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -16,7 +16,7 @@ import scala.collection.mutable.ArrayBuffer
* fault recovery without spinning up a lot of processes.
*/
private[spark]
-class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
val localIpAddress = Utils.localIpAddress
@@ -25,29 +25,28 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
var masterPort : Int = _
var masterUrl : String = _
- val slaveActorSystems = ArrayBuffer[ActorSystem]()
- val slaveActors = ArrayBuffer[ActorRef]()
+ val workerActorSystems = ArrayBuffer[ActorSystem]()
+ val workerActors = ArrayBuffer[ActorRef]()
def start() : String = {
- logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
+ logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterActorSystem = actorSystem
masterUrl = "spark://" + localIpAddress + ":" + masterPort
- val actor = masterActorSystem.actorOf(
+ masterActor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
- masterActor = actor
/* Start the Slaves */
- for (slaveNum <- 1 to numSlaves) {
+ for (workerNum <- 1 to numWorkers) {
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
- slaveActorSystems += actorSystem
+ AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
+ workerActorSystems += actorSystem
val actor = actorSystem.actorOf(
- Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+ Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
name = "Worker")
- slaveActors += actor
+ workerActors += actor
}
return masterUrl
@@ -55,9 +54,9 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
def stop() {
logInfo("Shutting down local Spark cluster.")
- // Stop the slaves before the master so they don't get upset that it disconnected
- slaveActorSystems.foreach(_.shutdown())
- slaveActorSystems.foreach(_.awaitTermination())
+ // Stop the workers before the master so they don't get upset that it disconnected
+ workerActorSystems.foreach(_.shutdown())
+ workerActorSystems.foreach(_.awaitTermination())
masterActorSystem.shutdown()
masterActorSystem.awaitTermination()
}