aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala69
1 files changed, 69 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
new file mode 100644
index 0000000000..af5a4110b0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.{Logging, Utils}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Testing class that creates a Spark standalone process in-cluster (that is, running the
+ * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
+ * by the Workers still run in separate JVMs. This can be used to test distributed operation and
+ * fault recovery without spinning up a lot of processes.
+ */
+private[spark]
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
+
+ private val localHostname = Utils.localHostName()
+ private val masterActorSystems = ArrayBuffer[ActorSystem]()
+ private val workerActorSystems = ArrayBuffer[ActorSystem]()
+
+ def start(): String = {
+ logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
+
+ /* Start the Master */
+ val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
+ masterActorSystems += masterSystem
+ val masterUrl = "spark://" + localHostname + ":" + masterPort
+
+ /* Start the Workers */
+ for (workerNum <- 1 to numWorkers) {
+ val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
+ memoryPerWorker, masterUrl, null, Some(workerNum))
+ workerActorSystems += workerSystem
+ }
+
+ return masterUrl
+ }
+
+ def stop() {
+ logInfo("Shutting down local Spark cluster.")
+ // Stop the workers before the master so they don't get upset that it disconnected
+ workerActorSystems.foreach(_.shutdown())
+ workerActorSystems.foreach(_.awaitTermination())
+
+ masterActorSystems.foreach(_.shutdown())
+ masterActorSystems.foreach(_.awaitTermination())
+ }
+}