aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-02 01:11:37 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-02 01:11:37 -0600
commit28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d (patch)
tree30d4e5ee4ed7ef964c18765b29e07b818b0d29f0
parent8b3041c7233011c4a96fab045a86df91eae7b6f3 (diff)
downloadspark-28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d.tar.gz
spark-28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d.tar.bz2
spark-28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d.zip
Fix createActorSystem not actually using the systemName parameter.
This meant all system names were "spark", which worked, but didn't lead to the most intuitive log output. This fixes createActorSystem to use the passed system name, and refactors Master/Worker to encapsulate their system/actor names instead of having the clients guess at them. Note that the driver system name, "spark", is left as is, and is still repeated a few times, but that seems like a separate issue.
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala38
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala13
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala24
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala58
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala2
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala6
6 files changed, 68 insertions, 73 deletions
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 2836574ecb..22319a96ca 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
- val localIpAddress = Utils.localIpAddress
+ private val localIpAddress = Utils.localIpAddress
+ private val masterActorSystems = ArrayBuffer[ActorSystem]()
+ private val workerActorSystems = ArrayBuffer[ActorSystem]()
- var masterActor : ActorRef = _
- var masterActorSystem : ActorSystem = _
- var masterPort : Int = _
- var masterUrl : String = _
-
- val workerActorSystems = ArrayBuffer[ActorSystem]()
- val workerActors = ArrayBuffer[ActorRef]()
-
- def start() : String = {
+ def start(): String = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
- val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
- masterActorSystem = actorSystem
- masterUrl = "spark://" + localIpAddress + ":" + masterPort
- masterActor = masterActorSystem.actorOf(
- Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
+ val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0)
+ masterActorSystems += masterSystem
+ val masterUrl = "spark://" + localIpAddress + ":" + masterPort
- /* Start the Slaves */
+ /* Start the Workers */
for (workerNum <- 1 to numWorkers) {
- val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
- workerActorSystems += actorSystem
- val actor = actorSystem.actorOf(
- Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
- name = "Worker")
- workerActors += actor
+ val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker,
+ memoryPerWorker, masterUrl, null, Some(workerNum))
+ workerActorSystems += workerSystem
}
return masterUrl
@@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// Stop the workers before the master so they don't get upset that it disconnected
workerActorSystems.foreach(_.shutdown())
workerActorSystems.foreach(_.awaitTermination())
- masterActorSystem.shutdown()
- masterActorSystem.awaitTermination()
+ masterActorSystems.foreach(_.shutdown())
+ masterActorSystems.foreach(_.awaitTermination())
}
}
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index 90fe9508cd..a63eee1233 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -9,6 +9,7 @@ import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
+import spark.deploy.master.Master
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
import akka.dispatch.Await
@@ -24,26 +25,18 @@ private[spark] class Client(
listener: ClientListener)
extends Logging {
- val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
-
var actor: ActorRef = null
var jobId: String = null
- if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
- throw new SparkException("Invalid master URL: " + masterUrl)
- }
-
class ClientActor extends Actor with Logging {
var master: ActorRef = null
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
- val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
- logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
- val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
+ logInfo("Connecting to master " + masterUrl)
try {
- master = context.actorFor(akkaUrl)
+ master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c618e87cdd..92e7914b1b 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
private[spark] object Master {
+ private val systemName = "sparkMaster"
+ private val actorName = "Master"
+ private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
- val actor = actorSystem.actorOf(
- Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
+ val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort)
actorSystem.awaitTermination()
}
+
+ /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ def toAkkaUrl(sparkUrl: String): String = {
+ sparkUrl match {
+ case sparkUrlRegex(host, port) =>
+ "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ case _ =>
+ throw new SparkException("Invalid master URL: " + sparkUrl)
+ }
+ }
+
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+ (actorSystem, boundPort)
+ }
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 8b41620d98..2219dd6262 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -1,7 +1,7 @@
package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import akka.actor.{ActorRef, Props, Actor}
+import akka.actor.{ActorRef, Props, Actor, ActorSystem}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
@@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker
import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
+import spark.deploy.master.Master
import akka.actor.Terminated
import java.io.File
@@ -27,7 +28,6 @@ private[spark] class Worker(
extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
- val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
var masterWebUiUrl : String = ""
@@ -48,11 +48,7 @@ private[spark] class Worker(
def memoryFree: Int = memory - memoryUsed
def createWorkDir() {
- workDir = if (workDirPath != null) {
- new File(workDirPath)
- } else {
- new File(sparkHome, "work")
- }
+ workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
if (!workDir.exists() && !workDir.mkdirs()) {
logError("Failed to create work directory " + workDir)
@@ -68,8 +64,7 @@ private[spark] class Worker(
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
ip, port, cores, Utils.memoryMegabytesToString(memory)))
- val envVar = System.getenv("SPARK_HOME")
- sparkHome = new File(if (envVar == null) "." else envVar)
+ sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
connectToMaster()
@@ -77,24 +72,15 @@ private[spark] class Worker(
}
def connectToMaster() {
- masterUrl match {
- case MASTER_REGEX(masterHost, masterPort) => {
- logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
- val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
- try {
- master = context.actorFor(akkaUrl)
- master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
- } catch {
- case e: Exception =>
- logError("Failed to connect to master", e)
- System.exit(1)
- }
- }
-
- case _ =>
- logError("Invalid master URL: " + masterUrl)
+ logInfo("Connecting to master " + masterUrl)
+ try {
+ master = context.actorFor(Master.toAkkaUrl(masterUrl))
+ master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to master", e)
System.exit(1)
}
}
@@ -183,11 +169,19 @@ private[spark] class Worker(
private[spark] object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
- val actor = actorSystem.actorOf(
- Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory,
- args.master, args.workDir)),
- name = "Worker")
+ val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores,
+ args.memory, args.master, args.workDir)
actorSystem.awaitTermination()
}
+
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
+ masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+ // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+ val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
+ masterUrl, workDir)), name = "Worker")
+ (actorSystem, boundPort)
+ }
+
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 36398095a2..7be6b9fa87 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster(
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
- val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
- val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
var driverActor: ActorRef = {
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index e0fdeffbc4..3a3626e8a0 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException
* Various utility classes for working with Akka.
*/
private[spark] object AkkaUtils {
+
/**
* Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
* ActorSystem itself and its port (which is hard to get from Akka).
+ *
+ * Note: the `name` parameter is important, as even if a client sends a message to right
+ * host + port, if the system name is incorrect, Akka will drop the message.
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
@@ -41,7 +45,7 @@ private[spark] object AkkaUtils {
akka.actor.default-dispatcher.throughput = %d
""".format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
- val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
+ val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.