aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
blob: 939f26b6f48423ae0e34052c88da832157531823 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package spark.deploy

import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}

import spark.deploy.worker.Worker
import spark.deploy.master.Master
import spark.util.AkkaUtils
import spark.{Logging, Utils}

import scala.collection.mutable.ArrayBuffer

/**
 * Testing class that creates a Spark standalone process in-cluster (that is, running the
 * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
 * by the Workers still run in separate JVMs. This can be used to test distributed operation and
 * fault recovery without spinning up a lot of processes.
 */
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
  
  private val localHostname = Utils.localHostName()
  private val masterActorSystems = ArrayBuffer[ActorSystem]()
  private val workerActorSystems = ArrayBuffer[ActorSystem]()
  
  def start(): String = {
    logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")

    /* Start the Master */
    val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
    masterActorSystems += masterSystem
    val masterUrl = "spark://" + localHostname + ":" + masterPort

    /* Start the Workers */
    for (workerNum <- 1 to numWorkers) {
      val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
        memoryPerWorker, masterUrl, null, Some(workerNum))
      workerActorSystems += workerSystem
    }

    return masterUrl
  }

  def stop() {
    logInfo("Shutting down local Spark cluster.")
    // Stop the workers before the master so they don't get upset that it disconnected
    workerActorSystems.foreach(_.shutdown())
    workerActorSystems.foreach(_.awaitTermination())

    masterActorSystems.foreach(_.shutdown())
    masterActorSystems.foreach(_.awaitTermination())
  }
}