From 28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sat, 2 Feb 2013 01:11:37 -0600 Subject: Fix createActorSystem not actually using the systemName parameter. This meant all system names were "spark", which worked, but didn't lead to the most intuitive log output. This fixes createActorSystem to use the passed system name, and refactors Master/Worker to encapsulate their system/actor names instead of having the clients guess at them. Note that the driver system name, "spark", is left as is, and is still repeated a few times, but that seems like a separate issue. --- .../scala/spark/deploy/LocalSparkCluster.scala | 38 +++++--------- .../main/scala/spark/deploy/client/Client.scala | 13 ++--- .../main/scala/spark/deploy/master/Master.scala | 24 +++++++-- .../main/scala/spark/deploy/worker/Worker.scala | 58 ++++++++++------------ .../scala/spark/storage/BlockManagerMaster.scala | 2 - core/src/main/scala/spark/util/AkkaUtils.scala | 6 ++- 6 files changed, 68 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 2836574ecb..22319a96ca 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer private[spark] class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { - val localIpAddress = Utils.localIpAddress + private val localIpAddress = Utils.localIpAddress + private val masterActorSystems = ArrayBuffer[ActorSystem]() + private val workerActorSystems = ArrayBuffer[ActorSystem]() - var masterActor : ActorRef = _ - var masterActorSystem : ActorSystem = _ - var masterPort : Int = _ - var masterUrl : String = _ - - val workerActorSystems = ArrayBuffer[ActorSystem]() - val workerActors = ArrayBuffer[ActorRef]() - - def start() : String = { + def start(): String = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) - masterActorSystem = actorSystem - masterUrl = "spark://" + localIpAddress + ":" + masterPort - masterActor = masterActorSystem.actorOf( - Props(new Master(localIpAddress, masterPort, 0)), name = "Master") + val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0) + masterActorSystems += masterSystem + val masterUrl = "spark://" + localIpAddress + ":" + masterPort - /* Start the Slaves */ + /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0) - workerActorSystems += actorSystem - val actor = actorSystem.actorOf( - Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)), - name = "Worker") - workerActors += actor + val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker, + memoryPerWorker, masterUrl, null, Some(workerNum)) + workerActorSystems += workerSystem } return masterUrl @@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I // Stop the workers before the master so they don't get upset that it disconnected workerActorSystems.foreach(_.shutdown()) workerActorSystems.foreach(_.awaitTermination()) - masterActorSystem.shutdown() - masterActorSystem.awaitTermination() + masterActorSystems.foreach(_.shutdown()) + masterActorSystems.foreach(_.awaitTermination()) } } diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 90fe9508cd..a63eee1233 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -9,6 +9,7 @@ import spark.{SparkException, Logging} import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown import spark.deploy.RegisterJob +import spark.deploy.master.Master import akka.remote.RemoteClientDisconnected import akka.actor.Terminated import akka.dispatch.Await @@ -24,26 +25,18 @@ private[spark] class Client( listener: ClientListener) extends Logging { - val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r - var actor: ActorRef = null var jobId: String = null - if (MASTER_REGEX.unapplySeq(masterUrl) == None) { - throw new SparkException("Invalid master URL: " + masterUrl) - } - class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { - val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get - logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) - val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) + logInfo("Connecting to master " + masterUrl) try { - master = context.actorFor(akkaUrl) + master = context.actorFor(Master.toAkkaUrl(masterUrl)) masterAddress = master.path.address master ! RegisterJob(jobDescription) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c618e87cdd..92e7914b1b 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } private[spark] object Master { + private val systemName = "sparkMaster" + private val actorName = "Master" + private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) - val actor = actorSystem.actorOf( - Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master") + val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort) actorSystem.awaitTermination() } + + /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ + def toAkkaUrl(sparkUrl: String): String = { + sparkUrl match { + case sparkUrlRegex(host, port) => + "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + case _ => + throw new SparkException("Invalid master URL: " + sparkUrl) + } + } + + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) + val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) + (actorSystem, boundPort) + } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 8b41620d98..2219dd6262 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -1,7 +1,7 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{ActorRef, Props, Actor, ActorSystem} import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ @@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected import spark.deploy.RegisterWorker import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed +import spark.deploy.master.Master import akka.actor.Terminated import java.io.File @@ -27,7 +28,6 @@ private[spark] class Worker( extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs - val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r var master: ActorRef = null var masterWebUiUrl : String = "" @@ -48,11 +48,7 @@ private[spark] class Worker( def memoryFree: Int = memory - memoryUsed def createWorkDir() { - workDir = if (workDirPath != null) { - new File(workDirPath) - } else { - new File(sparkHome, "work") - } + workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { if (!workDir.exists() && !workDir.mkdirs()) { logError("Failed to create work directory " + workDir) @@ -68,8 +64,7 @@ private[spark] class Worker( override def preStart() { logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ip, port, cores, Utils.memoryMegabytesToString(memory))) - val envVar = System.getenv("SPARK_HOME") - sparkHome = new File(if (envVar == null) "." else envVar) + sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() connectToMaster() @@ -77,24 +72,15 @@ private[spark] class Worker( } def connectToMaster() { - masterUrl match { - case MASTER_REGEX(masterHost, masterPort) => { - logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) - val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) - try { - master = context.actorFor(akkaUrl) - master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to master", e) - System.exit(1) - } - } - - case _ => - logError("Invalid master URL: " + masterUrl) + logInfo("Connecting to master " + masterUrl) + try { + master = context.actorFor(Master.toAkkaUrl(masterUrl)) + master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } catch { + case e: Exception => + logError("Failed to connect to master", e) System.exit(1) } } @@ -183,11 +169,19 @@ private[spark] class Worker( private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) - val actor = actorSystem.actorOf( - Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory, - args.master, args.workDir)), - name = "Worker") + val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores, + args.memory, args.master, args.workDir) actorSystem.awaitTermination() } + + def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, + masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + // The LocalSparkCluster runs multiple local sparkWorkerX actor systems + val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") + val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) + val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, + masterUrl, workDir)), name = "Worker") + (actorSystem, boundPort) + } + } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 36398095a2..7be6b9fa87 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster( val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager" - val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" - val DEFAULT_MANAGER_IP: String = Utils.localHostName() val timeout = 10.seconds var driverActor: ActorRef = { diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index e0fdeffbc4..3a3626e8a0 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException * Various utility classes for working with Akka. */ private[spark] object AkkaUtils { + /** * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the * ActorSystem itself and its port (which is hard to get from Akka). + * + * Note: the `name` parameter is important, as even if a client sends a message to right + * host + port, if the system name is incorrect, Akka will drop the message. */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt @@ -41,7 +45,7 @@ private[spark] object AkkaUtils { akka.actor.default-dispatcher.throughput = %d """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize)) - val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) + val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader) // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet. -- cgit v1.2.3