aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-07 15:43:43 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-07 15:43:43 -0700
commit8d2fcc2832e2a2c9db4c430bccda76372dd67a18 (patch)
treeb2d1244cb94c704a56fd790875cde6e0ce0c5463
parentc2da64409a7059ea6a1560dfc517d137f4fe88eb (diff)
parent7ff9311add00aa46bee8ca706030907ef1471d8c (diff)
downloadspark-8d2fcc2832e2a2c9db4c430bccda76372dd67a18.tar.gz
spark-8d2fcc2832e2a2c9db4c430bccda76372dd67a18.tar.bz2
spark-8d2fcc2832e2a2c9db4c430bccda76372dd67a18.zip
Merge pull request #189 from dennybritz/feature/localcluster
Simulating a Spark standalone cluster locally
-rw-r--r--core/src/main/scala/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala68
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala4
4 files changed, 104 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 538e057926..d7bd832e52 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
+import spark.deploy.LocalSparkCluster
+
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
@@ -81,9 +83,11 @@ class SparkContext(
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
+ val SPARK_LOCALCLUSTER_REGEX = """local-cluster\[([0-9]+)\,([0-9]+),([0-9]+)]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
-
+
master match {
case "local" =>
new LocalScheduler(1, 0)
@@ -99,6 +103,17 @@ class SparkContext(
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
+
+ case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
+ val scheduler = new ClusterScheduler(this)
+ val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
+ val sparkUrl = localCluster.start()
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
+ scheduler.initialize(backend)
+ backend.shutdownHook = (backend: SparkDeploySchedulerBackend) => {
+ localCluster.stop()
+ }
+ scheduler
case _ =>
MesosNativeLibrary.load()
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
new file mode 100644
index 0000000000..da74df4dcf
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -0,0 +1,68 @@
+package spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+
+import spark.deploy.worker.Worker
+import spark.deploy.master.Master
+import spark.util.AkkaUtils
+import spark.{Logging, Utils}
+
+import scala.collection.mutable.ArrayBuffer
+
+class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
+ memoryPerSlave : Int) extends Logging {
+
+ val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
+ val localIpAddress = Utils.localIpAddress
+
+ var masterActor : ActorRef = _
+ var masterActorSystem : ActorSystem = _
+ var masterPort : Int = _
+ var masterUrl : String = _
+
+ val slaveActorSystems = ArrayBuffer[ActorSystem]()
+ val slaveActors = ArrayBuffer[ActorRef]()
+
+ def start() : String = {
+
+ logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
+
+ /* Start the Master */
+ val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
+ masterUrl = "spark://" + localIpAddress + ":" + masterPort
+ threadPool.execute(new Runnable {
+ def run() {
+ val actor = masterActorSystem.actorOf(
+ Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
+ masterActor = actor
+ masterActorSystem.awaitTermination()
+ }
+ })
+
+ /* Start the Slaves */
+ (1 to numSlaves).foreach { slaveNum =>
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+ slaveActorSystems += actorSystem
+ threadPool.execute(new Runnable {
+ def run() {
+ val actor = actorSystem.actorOf(
+ Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
+ name = "Worker")
+ slaveActors += actor
+ actorSystem.awaitTermination()
+ }
+ })
+ }
+
+ return masterUrl
+ }
+
+ def stop() {
+ logInfo("Shutting down local Spark cluster.")
+ masterActorSystem.shutdown()
+ slaveActorSystems.foreach(_.shutdown())
+ }
+
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 3e24380810..393f4a3ee6 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -35,6 +35,19 @@ class ExecutorRunner(
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
+
+ // Shutdown hook that kills actors on shutdown.
+ Runtime.getRuntime.addShutdownHook(
+ new Thread() {
+ override def run() {
+ if(process != null) {
+ logInfo("Shutdown Hook killing process.")
+ process.destroy()
+ process.waitFor()
+ }
+ }
+ })
+
}
/** Stop this executor runner, including killing the process it launched */
@@ -131,6 +144,9 @@ class ExecutorRunner(
}
env.put("SPARK_CORES", cores.toString)
env.put("SPARK_MEMORY", memory.toString)
+ // In case we are running this from within the Spark Shell
+ // so we are not creating a parent process.
+ env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
// Redirect its stdout and stderr to files
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 0bd2d15479..ec3ff38d5c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend(
var client: Client = null
var stopping = false
+ var shutdownHook : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
@@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend(
stopping = true;
super.stop()
client.stop()
+ if (shutdownHook != null) {
+ shutdownHook(this)
+ }
}
def connected(jobId: String) {