diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-29 16:18:51 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-29 16:18:51 -0700 |
commit | c6be4ffbf957d1512559efcbf10d11397efca154 (patch) | |
tree | 6f9ac8e6ad45a20d8773b46123e58755a9cdd68a | |
parent | 3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22 (diff) | |
download | spark-c6be4ffbf957d1512559efcbf10d11397efca154.tar.gz spark-c6be4ffbf957d1512559efcbf10d11397efca154.tar.bz2 spark-c6be4ffbf957d1512559efcbf10d11397efca154.zip |
Fixes to CoarseMesosScheduler
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala | 20 |
1 files changed, 12 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala index 525cf9747f..95ad6c5b59 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala @@ -64,7 +64,7 @@ class CoarseMesosScheduler( def receive = { case RegisterSlave(slaveId, host) => slaveActor(slaveId) = sender - logInfo("Slave actor: " + sender) + logInfo("Registered slave: " + sender + " with ID " + slaveId) slaveHost(slaveId) = host freeCores(slaveId) = coresPerSlave makeFakeOffers() @@ -96,7 +96,7 @@ class CoarseMesosScheduler( } } - val masterActor: ActorRef = actorSystem.actorOf(Props[MasterActor], name = actorName) + val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName) val taskIdsOnSlave = new HashMap[String, HashSet[String]] @@ -284,24 +284,28 @@ class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) { generation = 0 def run(id: Int): Unit = { - val actorSystem = SparkEnv.get.actorSystem - val actor = actorSystem.actorOf(Props(new WorkerActor(slaveId, host)), name = "WorkerActor") + val env = SparkEnv.get + val classLoader = currentThread.getContextClassLoader + val actor = env.actorSystem.actorOf( + Props(new WorkerActor(slaveId, host, env, classLoader)), + name = "WorkerActor") + // Wait forever so that our Mesos task doesn't end while (true) { Thread.sleep(10000) } } } -class WorkerActor(slaveId: String, host: String) extends Actor with Logging { - val env = SparkEnv.get - val classLoader = currentThread.getContextClassLoader +class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader) + extends Actor with Logging { + val threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) val masterIp: String = System.getProperty("spark.master.host", "localhost") val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt val masterActor = env.actorSystem.actorFor( - "akka://spark@%s:%s/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) + "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) class TaskRunner(desc: MTaskInfo) extends Runnable { |