aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-29 16:18:51 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-29 16:18:51 -0700
commitc6be4ffbf957d1512559efcbf10d11397efca154 (patch)
tree6f9ac8e6ad45a20d8773b46123e58755a9cdd68a
parent3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22 (diff)
downloadspark-c6be4ffbf957d1512559efcbf10d11397efca154.tar.gz
spark-c6be4ffbf957d1512559efcbf10d11397efca154.tar.bz2
spark-c6be4ffbf957d1512559efcbf10d11397efca154.zip
Fixes to CoarseMesosScheduler
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala20
1 files changed, 12 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
index 525cf9747f..95ad6c5b59 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
@@ -64,7 +64,7 @@ class CoarseMesosScheduler(
def receive = {
case RegisterSlave(slaveId, host) =>
slaveActor(slaveId) = sender
- logInfo("Slave actor: " + sender)
+ logInfo("Registered slave: " + sender + " with ID " + slaveId)
slaveHost(slaveId) = host
freeCores(slaveId) = coresPerSlave
makeFakeOffers()
@@ -96,7 +96,7 @@ class CoarseMesosScheduler(
}
}
- val masterActor: ActorRef = actorSystem.actorOf(Props[MasterActor], name = actorName)
+ val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName)
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
@@ -284,24 +284,28 @@ class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) {
generation = 0
def run(id: Int): Unit = {
- val actorSystem = SparkEnv.get.actorSystem
- val actor = actorSystem.actorOf(Props(new WorkerActor(slaveId, host)), name = "WorkerActor")
+ val env = SparkEnv.get
+ val classLoader = currentThread.getContextClassLoader
+ val actor = env.actorSystem.actorOf(
+ Props(new WorkerActor(slaveId, host, env, classLoader)),
+ name = "WorkerActor")
+ // Wait forever so that our Mesos task doesn't end
while (true) {
Thread.sleep(10000)
}
}
}
-class WorkerActor(slaveId: String, host: String) extends Actor with Logging {
- val env = SparkEnv.get
- val classLoader = currentThread.getContextClassLoader
+class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader)
+ extends Actor with Logging {
+
val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
val masterIp: String = System.getProperty("spark.master.host", "localhost")
val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
val masterActor = env.actorSystem.actorFor(
- "akka://spark@%s:%s/%s".format(masterIp, masterPort, "CoarseMesosScheduler"))
+ "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler"))
class TaskRunner(desc: MTaskInfo)
extends Runnable {